use std::net::{SocketAddr, TcpStream};
use std::time::Duration;
use crate::reducer::effect::EffectResult;
use crate::reducer::event::{AgentEvent, PipelineEvent};
use crate::reducer::state::ConnectivityState;
use crate::reducer::ui_event::UIEvent;
const PROBE_TARGETS: &[(&str, u16)] = &[("1.1.1.1", 443), ("8.8.8.8", 53)];
const PROBE_TIMEOUT: Duration = Duration::from_secs(5);
fn probe_connectivity() -> bool {
PROBE_TARGETS.iter().any(|(host, port)| {
let addr: SocketAddr = match format!("{host}:{port}").parse() {
Ok(a) => a,
Err(_) => return false,
};
TcpStream::connect_timeout(&addr, PROBE_TIMEOUT).is_ok()
})
}
fn connectivity_check_message(is_offline: bool, check_pending: bool) -> String {
if is_offline {
"Still offline — verifying connectivity...".to_string()
} else if check_pending {
"Verifying network connectivity...".to_string()
} else {
"Checking network connectivity...".to_string()
}
}
fn offline_poll_message(is_offline: bool, poll_pending: bool) -> String {
if is_offline && poll_pending {
"Still offline — polling for connectivity...".to_string()
} else if is_offline {
"Offline detected — run paused. No continuation budget or retry budget is being consumed. Waiting for connectivity to return.".to_string()
} else {
"Polling for connectivity...".to_string()
}
}
fn resume_confirmation_message() -> String {
"Connectivity restored — resuming workflow. No continuation budget or retry budget was consumed during the offline window.".to_string()
}
#[derive(Clone, Copy)]
enum PollUiMessageType {
OfflinePoll,
ConnectivityCheck,
ResumeConfirmation,
}
fn determine_poll_ui_message_type(
was_offline: bool,
is_offline: bool,
probe_result: bool,
) -> PollUiMessageType {
if was_offline && probe_result {
PollUiMessageType::ResumeConfirmation
} else if is_offline {
PollUiMessageType::OfflinePoll
} else {
PollUiMessageType::ConnectivityCheck
}
}
fn poll_ui_messages(
was_offline: bool,
connectivity: &ConnectivityState,
probe_result: bool,
) -> Vec<String> {
let message_type =
determine_poll_ui_message_type(was_offline, connectivity.is_offline, probe_result);
match message_type {
PollUiMessageType::ResumeConfirmation => vec![
offline_poll_message(connectivity.is_offline, connectivity.poll_pending),
resume_confirmation_message(),
],
PollUiMessageType::OfflinePoll => {
vec![offline_poll_message(
connectivity.is_offline,
connectivity.poll_pending,
)]
}
PollUiMessageType::ConnectivityCheck => {
vec![connectivity_check_message(
connectivity.is_offline,
connectivity.check_pending,
)]
}
}
}
fn poll_ui_events(
was_offline: bool,
connectivity: &ConnectivityState,
probe_result: bool,
) -> Vec<UIEvent> {
poll_ui_messages(was_offline, connectivity, probe_result)
.into_iter()
.map(|msg| UIEvent::AgentActivity {
agent: "connectivity".to_string(),
message: msg,
})
.collect()
}
fn poll_connectivity_event(probe_result: bool) -> PipelineEvent {
if probe_result {
PipelineEvent::Agent(AgentEvent::ConnectivityCheckSucceeded)
} else {
PipelineEvent::Agent(AgentEvent::ConnectivityCheckFailed)
}
}
pub(super) fn check_network_connectivity(connectivity: &ConnectivityState) -> EffectResult {
let message = connectivity_check_message(connectivity.is_offline, connectivity.check_pending);
let ui_event = UIEvent::AgentActivity {
agent: "connectivity".to_string(),
message,
};
if probe_connectivity() {
EffectResult::event(PipelineEvent::Agent(AgentEvent::ConnectivityCheckSucceeded))
.with_ui_event(ui_event)
} else {
EffectResult::event(PipelineEvent::Agent(AgentEvent::ConnectivityCheckFailed))
.with_ui_event(ui_event)
}
}
pub(super) fn poll_for_connectivity(
interval_ms: u64,
connectivity: &ConnectivityState,
) -> EffectResult {
let was_offline = connectivity.is_offline;
std::thread::sleep(Duration::from_millis(interval_ms));
let probe_result = probe_connectivity();
let event = poll_connectivity_event(probe_result);
let ui_events = poll_ui_events(was_offline, connectivity, probe_result);
let mut result = EffectResult::event(event);
for ui_event in ui_events {
result = result.with_ui_event(ui_event);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_probe_connectivity_does_not_panic() {
let _ = probe_connectivity();
}
#[test]
fn test_check_network_connectivity_returns_effect_result() {
let connectivity_state = ConnectivityState::default();
let result = check_network_connectivity(&connectivity_state);
let _ = result.event;
}
#[test]
fn test_check_network_connectivity_emits_ui_event_when_online() {
let connectivity_state = ConnectivityState::default();
let result = check_network_connectivity(&connectivity_state);
assert!(
!result.ui_events.is_empty(),
"Should emit UI event when checking connectivity"
);
}
#[test]
fn test_poll_for_connectivity_returns_effect_result() {
let connectivity_state = ConnectivityState {
is_offline: true,
poll_pending: true,
..ConnectivityState::default()
};
let result = poll_for_connectivity(1, &connectivity_state);
let _ = result.event;
}
#[test]
fn test_poll_for_connectivity_emits_ui_event() {
let connectivity_state = ConnectivityState {
is_offline: true,
poll_pending: true,
..ConnectivityState::default()
};
let result = poll_for_connectivity(1, &connectivity_state);
assert!(
!result.ui_events.is_empty(),
"Should emit UI event when polling for connectivity"
);
}
#[test]
fn test_poll_for_connectivity_emits_resume_confirmation_on_restore() {
let resume = resume_confirmation_message();
assert!(
resume.contains("Connectivity restored"),
"Resume message should contain 'Connectivity restored', got: {resume}"
);
assert!(
resume.contains("resuming") || resume.contains("resume"),
"Resume message should mention resuming workflow, got: {resume}"
);
}
#[test]
fn test_poll_ui_events_does_not_emit_resume_when_still_offline() {
let connectivity = ConnectivityState {
is_offline: true,
poll_pending: true,
..ConnectivityState::default()
};
let was_offline = true;
let probe_result = false;
let events = poll_ui_events(was_offline, &connectivity, probe_result);
assert_eq!(
events.len(),
1,
"Should emit exactly 1 event when still offline"
);
let msg = match &events[0] {
UIEvent::AgentActivity { message, .. } => message.clone(),
other => panic!("Expected AgentActivity, got: {:?}", other),
};
assert!(
!msg.contains("Connectivity restored"),
"Should NOT emit resume message when probe failed, got: {}",
msg
);
assert!(
msg.contains("Still offline"),
"Should emit still-offline message, got: {}",
msg
);
}
#[test]
fn test_poll_ui_events_emits_resume_when_online_restored() {
let connectivity = ConnectivityState {
is_offline: true, poll_pending: true,
..ConnectivityState::default()
};
let was_offline = true;
let probe_result = true;
let events = poll_ui_events(was_offline, &connectivity, probe_result);
assert_eq!(events.len(), 2, "Should emit exactly 2 events on restore");
let first_msg = match &events[0] {
UIEvent::AgentActivity { message, .. } => message.clone(),
other => panic!("Expected AgentActivity, got: {:?}", other),
};
assert!(
first_msg.contains("Still offline") || first_msg.contains("polling"),
"First event should be offline polling context, got: {}",
first_msg
);
let second_msg = match &events[1] {
UIEvent::AgentActivity { message, .. } => message.clone(),
other => panic!("Expected AgentActivity, got: {:?}", other),
};
assert!(
second_msg.contains("Connectivity restored"),
"Second event should be resume message, got: {}",
second_msg
);
}
#[test]
fn test_poll_ui_events_emits_check_message_when_online_and_not_offline() {
let connectivity = ConnectivityState {
is_offline: false,
check_pending: true,
..ConnectivityState::default()
};
let was_offline = false;
let probe_result = true;
let events = poll_ui_events(was_offline, &connectivity, probe_result);
assert_eq!(events.len(), 1, "Should emit exactly 1 event");
let msg = match &events[0] {
UIEvent::AgentActivity { message, .. } => message.clone(),
other => panic!("Expected AgentActivity, got: {:?}", other),
};
assert!(
!msg.contains("Connectivity restored"),
"Should NOT emit resume when was_offline=false, got: {}",
msg
);
assert!(
!msg.contains("Still offline"),
"Should NOT emit still-offline when online, got: {}",
msg
);
}
}