use std::sync::Arc;
use std::time::Duration;
use moire::task::FutureExt;
use vox_types::{MetadataEntry, MethodId, Payload, RequestCall};
use super::utils::*;
use crate::session::{SessionAcceptOutcome, SessionRegistry, acceptor_on};
use crate::{Attachment, NoopClient, TransportMode};
#[tokio::test]
async fn resumable_acceptor_registry_keeps_pending_call_alive_across_auto_resume() {
let registry = SessionRegistry::default();
let (client_link1, client_break1, server_link1, server_break1) = breakable_link_pair(64);
let (client_link2, client_break2, server_link2, server_break2) = breakable_link_pair(64);
let source = TestLinkSource::new([
Attachment::initiator(client_link1),
Attachment::initiator(client_link2),
]);
let started = Arc::new(tokio::sync::Notify::new());
let started_for_wait = Arc::clone(&started);
let started_wait = started_for_wait.notified();
let release = Arc::new(tokio::sync::Notify::new());
let (server_established, client_established) = tokio::try_join!(
tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link1)
.session_registry(registry.clone())
.on_connection(ResumableReplyingHandler {
started,
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
),
tokio::time::timeout(
Duration::from_secs(1),
crate::initiator(source, TransportMode::Bare).establish::<NoopClient>(),
),
)
.expect("initial session establishment timed out");
let server_caller = match server_established.expect("server handshake failed") {
SessionAcceptOutcome::Established(client) => client,
SessionAcceptOutcome::Resumed => panic!("first accept should establish a new session"),
};
let caller = client_established.expect("client handshake failed");
let client_session_handle = caller.session.clone().unwrap();
let call_task = moire::task::spawn(
async move {
caller
.caller
.call(RequestCall {
method_id: MethodId(1),
args: Payload::outgoing(&66_u32),
schemas: Default::default(),
metadata: Default::default(),
})
.await
}
.named("registry_resume_pending_call"),
);
tokio::time::timeout(Duration::from_secs(1), started_wait)
.await
.expect("timed out waiting for handler start");
client_break1.close().await;
server_break1.close().await;
let server_accept_result = tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link2)
.session_registry(registry.clone())
.on_connection(ResumableReplyingHandler {
started: Arc::new(tokio::sync::Notify::new()),
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
)
.await
.expect("timed out waiting for resume");
match server_accept_result.expect("server accept should succeed") {
SessionAcceptOutcome::Resumed => {}
SessionAcceptOutcome::Established(_) => {
panic!("registry accept should have resumed the existing session")
}
}
release.notify_waiters();
let response = call_task
.await
.expect("call task join")
.expect("call should succeed after registry-driven session resume");
let response = response.get();
let ret_bytes = match &response.ret {
Payload::PostcardBytes(bytes) => *bytes,
_ => panic!("expected incoming payload in response"),
};
let value: u32 = vox_postcard::from_slice(ret_bytes).expect("deserialize response");
assert_eq!(value, 66);
drop(server_caller);
let _ = client_session_handle.shutdown();
client_break2.close().await;
server_break2.close().await;
}
#[tokio::test]
async fn resumable_source_initiator_keeps_pending_call_alive_across_auto_resume() {
let registry = SessionRegistry::default();
let (client_link1, client_break1, server_link1, server_break1) = breakable_link_pair(64);
let (client_link2, client_break2, server_link2, server_break2) = breakable_link_pair(64);
let source = TestLinkSource::new([
Attachment::initiator(client_link1),
Attachment::initiator(client_link2),
]);
let started = Arc::new(tokio::sync::Notify::new());
let started_for_wait = Arc::clone(&started);
let started_wait = started_for_wait.notified();
let release = Arc::new(tokio::sync::Notify::new());
let (server_established, client_established) = tokio::try_join!(
tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link1)
.session_registry(registry.clone())
.on_connection(ResumableReplyingHandler {
started,
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
),
tokio::time::timeout(
Duration::from_secs(1),
crate::initiator(source, TransportMode::Bare).establish::<NoopClient>(),
),
)
.expect("initial session establishment timed out");
let server_caller = match server_established.expect("server handshake failed") {
SessionAcceptOutcome::Established(client) => client,
SessionAcceptOutcome::Resumed => panic!("first accept should establish a new session"),
};
let caller = client_established.expect("client handshake failed");
let client_session_handle = caller.session.clone().unwrap();
let call_task = moire::task::spawn(
async move {
caller
.caller
.call(RequestCall {
method_id: MethodId(1),
args: Payload::outgoing(&77_u32),
schemas: Default::default(),
metadata: Default::default(),
})
.await
}
.named("source_auto_resume_pending_call"),
);
tokio::time::timeout(Duration::from_secs(1), started_wait)
.await
.expect("timed out waiting for handler start");
client_break1.close().await;
server_break1.close().await;
let server_accept_result = tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link2)
.session_registry(registry.clone())
.on_connection(ResumableReplyingHandler {
started: Arc::new(tokio::sync::Notify::new()),
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
)
.await
.expect("timed out waiting for source-driven resume");
match server_accept_result.expect("server accept should succeed") {
SessionAcceptOutcome::Resumed => {}
SessionAcceptOutcome::Established(_) => {
panic!("registry accept should have resumed the existing session")
}
}
release.notify_waiters();
let response = call_task
.await
.expect("call task join")
.expect("call should succeed after source-driven auto-resume");
let response = response.get();
let ret_bytes = match &response.ret {
Payload::PostcardBytes(bytes) => *bytes,
_ => panic!("expected incoming payload in response"),
};
let value: u32 = vox_postcard::from_slice(ret_bytes).expect("deserialize response");
assert_eq!(value, 77);
drop(server_caller);
let _ = client_session_handle.shutdown();
client_break2.close().await;
server_break2.close().await;
}
#[tokio::test]
async fn resumable_source_initiator_falls_back_to_fresh_session_when_resume_key_unknown() {
let initial_registry = SessionRegistry::default();
let restarted_registry = SessionRegistry::default();
let (client_link1, client_break1, server_link1, server_break1) = breakable_link_pair(64);
let (client_link2, client_break2, server_link2, server_break2) = breakable_link_pair(64);
let source = TestLinkSource::new([
Attachment::initiator(client_link1),
Attachment::initiator(client_link2),
]);
let started = Arc::new(tokio::sync::Notify::new());
let started_for_wait = Arc::clone(&started);
let started_wait = started_for_wait.notified();
let release = Arc::new(tokio::sync::Notify::new());
let (server_established, client_established) = tokio::try_join!(
tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link1)
.session_registry(initial_registry.clone())
.on_connection(ResumableReplyingHandler {
started,
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
),
tokio::time::timeout(
Duration::from_secs(1),
crate::initiator(source, TransportMode::Bare).establish::<NoopClient>(),
),
)
.expect("initial session establishment timed out");
let initial_server_caller = match server_established.expect("server handshake failed") {
SessionAcceptOutcome::Established(client) => client,
SessionAcceptOutcome::Resumed => panic!("first accept should establish a new session"),
};
let caller = client_established.expect("client handshake failed");
let client_session_handle = caller.session.clone().unwrap();
let call_task = moire::task::spawn(
async move {
caller
.caller
.call(RequestCall {
method_id: MethodId(1),
args: Payload::outgoing(&88_u32),
schemas: Default::default(),
metadata: Default::default(),
})
.await
}
.named("source_auto_resume_unknown_key_then_fresh"),
);
tokio::time::timeout(Duration::from_secs(1), started_wait)
.await
.expect("timed out waiting for handler start");
client_break1.close().await;
server_break1.close().await;
let restarted_started = Arc::new(tokio::sync::Notify::new());
let restarted_started_waiter = Arc::clone(&restarted_started);
let restarted_started_wait = restarted_started_waiter.notified();
let restarted_accept = tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link2)
.session_registry(restarted_registry.clone())
.on_connection(ResumableReplyingHandler {
started: restarted_started,
release: Arc::clone(&release),
})
.metadata(vec![MetadataEntry::str("vox-service", "Noop")])
.establish_or_resume::<NoopClient>(),
)
.await
.expect("timed out waiting for fallback reconnection");
let restarted_server_caller = match restarted_accept.expect("server accept should succeed") {
SessionAcceptOutcome::Established(client) => client,
SessionAcceptOutcome::Resumed => panic!("fallback should establish a fresh session"),
};
tokio::time::timeout(Duration::from_secs(1), restarted_started_wait)
.await
.expect("timed out waiting for restarted handler start");
release.notify_waiters();
let response = call_task
.await
.expect("call task join")
.expect("call should succeed after fallback reconnection");
let response = response.get();
let ret_bytes = match &response.ret {
Payload::PostcardBytes(bytes) => *bytes,
_ => panic!("expected incoming payload in response"),
};
let value: u32 = vox_postcard::from_slice(ret_bytes).expect("deserialize response");
assert_eq!(value, 88);
drop(initial_server_caller);
drop(restarted_server_caller);
let _ = client_session_handle.shutdown();
client_break2.close().await;
server_break2.close().await;
}
#[tokio::test]
async fn recovery_timeout_gives_up_after_deadline() {
let (client_link1, client_break1, server_link1, server_break1) = breakable_link_pair(64);
let source = TestLinkSource::new([Attachment::initiator(client_link1)]);
let (server_established, client_established) = tokio::try_join!(
tokio::time::timeout(
Duration::from_secs(1),
acceptor_on(server_link1)
.on_connection(EchoHandler)
.establish::<NoopClient>(),
),
tokio::time::timeout(
Duration::from_secs(1),
crate::initiator(source, TransportMode::Bare)
.resumable()
.recovery_timeout(Duration::from_millis(500))
.connect_timeout(Duration::from_millis(200))
.establish::<NoopClient>(),
),
)
.expect("initial establishment timed out");
let _server = server_established.expect("server establish");
let client = client_established.expect("client establish");
let args: u32 = 42;
let response = client
.caller
.call(RequestCall {
method_id: MethodId(1),
args: Payload::outgoing(&args),
schemas: Default::default(),
metadata: Default::default(),
})
.await
.expect("call should succeed");
let response = response.get();
let ret_bytes = match &response.ret {
Payload::PostcardBytes(bytes) => *bytes,
_ => panic!("expected postcard bytes"),
};
let result: u32 = vox_postcard::from_slice(ret_bytes).expect("deserialize");
assert_eq!(result, 42);
client_break1.close().await;
server_break1.close().await;
let start = std::time::Instant::now();
client.caller.closed().await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(3),
"recovery should have given up, but took {elapsed:?}"
);
assert!(
elapsed >= Duration::from_millis(400),
"recovery gave up too quickly: {elapsed:?}"
);
}