use std::{
net::SocketAddr,
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use hyphae::Gettable;
use marshal_entities::{
GetAllSessions, Message, NotifyChannel, SendMessage, SendMessageResult, Session, SessionId,
};
use myko::{
client::{ConnectionStatus, MykoClient, MykoProtocol},
core::item::Eventable,
server::{CellServerCtx, Persister},
wire::{MEvent, MEventType},
};
use myko_server::{BlackholePersister, CellServer};
use uuid::Uuid;
const POLL_TIMEOUT: Duration = Duration::from_secs(8);
struct ServerHandle {
ctx: CellServerCtx,
shutdown: Option<std::sync::mpsc::Sender<()>>,
join: Option<thread::JoinHandle<()>>,
}
impl ServerHandle {
fn shutdown(mut self) {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(j) = self.join.take() {
let _ = j.join();
}
}
}
fn pick_free_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("pick free port");
let port = listener.local_addr().unwrap().port();
drop(listener);
port
}
fn spawn_server(bind: SocketAddr) -> ServerHandle {
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel::<CellServerCtx>(1);
let (shutdown_tx, shutdown_rx) = std::sync::mpsc::channel::<()>();
let join = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("build runtime");
rt.block_on(async move {
let blackhole: Arc<dyn Persister> = Arc::new(BlackholePersister);
let server = Arc::new(
CellServer::builder()
.with_bind_addr(bind)
.with_default_persister(blackhole)
.build(),
);
ready_tx.send(server.ctx()).expect("send ctx");
tokio::select! {
_ = server.run() => {}
_ = tokio::task::spawn_blocking(move || {
let _ = shutdown_rx.recv();
}) => {}
}
});
drop(rt);
});
let ctx = ready_rx
.recv_timeout(Duration::from_secs(5))
.expect("server thread came up");
ServerHandle {
ctx,
shutdown: Some(shutdown_tx),
join: Some(join),
}
}
fn wait_for(label: &str, mut f: impl FnMut() -> bool) {
let deadline = Instant::now() + POLL_TIMEOUT;
while Instant::now() < deadline {
if f() {
return;
}
thread::sleep(Duration::from_millis(50));
}
panic!("timed out waiting for: {label}");
}
fn make_session(id: &str, nickname: &str) -> Session {
Session {
id: SessionId(Arc::from(id)),
client_id: None, nickname: nickname.into(),
pid: 0,
cwd: "/repo".into(),
git_branch: None,
current_task: None,
connected_at: chrono::Utc::now().timestamp_millis(),
last_activity_at: None,
last_tool: None,
last_tool_at: None,
operator: None,
host: None,
project: None,
}
}
fn send_session_set(client: &MykoClient, session: &Session) {
let event = MEvent::from_item(session, MEventType::SET, &Uuid::new_v4().to_string());
client.send_event(event).expect("send_event");
}
fn message_count(ctx: &CellServerCtx) -> usize {
ctx.registry
.get(Message::ENTITY_NAME_STATIC)
.map(|store| store.entries().get().len())
.unwrap_or(0)
}
#[test]
fn send_message_delivers_then_persists_when_recipient_is_live() {
let _ = env_logger::builder().is_test(true).try_init();
marshal_entities::link();
daemon::link();
let port = pick_free_port();
let bind: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
let addr = format!("ws://{bind}");
let server = spawn_server(bind);
let client_a = MykoClient::new();
client_a.set_protocol(MykoProtocol::JSON);
let client_b = MykoClient::new();
client_b.set_protocol(MykoProtocol::JSON);
let received: Arc<Mutex<Option<NotifyChannel>>> = Arc::new(Mutex::new(None));
let received_for_handler = Arc::clone(&received);
let notify_guard = client_b.on_command::<NotifyChannel, _>(move |cmd, _responder| {
*received_for_handler.lock().expect("notify mutex") = Some(cmd);
});
Box::leak(Box::new(notify_guard));
let _b_sessions = client_b.watch_query::<GetAllSessions>(GetAllSessions {});
let _a_sessions = client_a.watch_query::<GetAllSessions>(GetAllSessions {});
client_a.set_address(Some(addr.clone()));
client_b.set_address(Some(addr.clone()));
let status_a = client_a.connection_status();
let status_b = client_b.connection_status();
{
let s = status_a.clone();
wait_for("A connected", move || {
matches!(s.get(), ConnectionStatus::Connected(_))
});
}
{
let s = status_b.clone();
wait_for("B connected", move || {
matches!(s.get(), ConnectionStatus::Connected(_))
});
}
thread::sleep(Duration::from_millis(200));
send_session_set(&client_a, &make_session("a", "alpha"));
send_session_set(&client_b, &make_session("b", "bravo"));
{
let cell = _a_sessions.clone();
wait_for("both sessions visible to A with client_ids", move || {
let sessions = cell.get();
let a = sessions.iter().find(|s| s.id.0.as_ref() == "a");
let b = sessions.iter().find(|s| s.id.0.as_ref() == "b");
matches!((a, b), (Some(a), Some(b)) if a.client_id.is_some() && b.client_id.is_some())
});
}
let cmd = SendMessage {
to_session_id: SessionId(Arc::from("b")),
body: "hello bravo".into(),
as_session: None, };
let response_cell = client_a.send_command::<SendMessage, SendMessageResult>(&cmd);
{
let cell = response_cell.clone();
wait_for("A's send_command response", move || {
matches!(cell.get(), Some(Ok(_)))
});
}
let result = response_cell.get().expect("got response").expect("ok");
assert_eq!(result.to_nick, "bravo");
{
let received = Arc::clone(&received);
wait_for("B received NotifyChannel", move || {
received.lock().expect("notify mutex").is_some()
});
}
let push = received.lock().unwrap().take().expect("push");
assert!(
push.content.contains("hello bravo"),
"channel content should carry the message body, got: {}",
push.content,
);
assert_eq!(
push.meta.get("kind"),
Some(&serde_json::json!("new_message"))
);
assert_eq!(
push.meta.get("from_nick"),
Some(&serde_json::json!("alpha"))
);
assert_eq!(
message_count(&server.ctx),
1,
"Message must be persisted on successful delivery",
);
server.shutdown();
}