#![cfg(feature = "comms")]
use std::path::{Path, PathBuf};
use std::process::{Child, Command};
use std::time::{Duration, Instant};
use basemind::comms::client::CommsClient;
use basemind::comms::ids::{AgentId, RoomId};
use basemind::comms::model::RoomScope;
use basemind::comms::singleton::{CommsPaths, comms_socket_path, probe_alive};
const BIN: &str = env!("CARGO_BIN_EXE_basemind");
struct Daemon {
child: Child,
comms_dir: PathBuf,
socket: PathBuf,
}
impl Daemon {
fn start(comms_dir: &Path) -> Self {
let socket = comms_socket_path(comms_dir);
let child = Command::new(BIN)
.args(["comms", "daemon"])
.env("BASEMIND_COMMS_DIR", comms_dir)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("spawn comms daemon");
let daemon = Self {
child,
comms_dir: comms_dir.to_path_buf(),
socket,
};
let deadline = Instant::now() + Duration::from_secs(10);
while Instant::now() < deadline {
if probe_alive(&daemon.socket) {
return daemon;
}
std::thread::sleep(Duration::from_millis(50));
}
panic!("comms daemon did not become ready");
}
fn socket(&self) -> &Path {
&self.socket
}
}
impl Drop for Daemon {
fn drop(&mut self) {
let _ = Command::new(BIN)
.args(["comms", "stop"])
.env("BASEMIND_COMMS_DIR", &self.comms_dir)
.output();
if self.child.try_wait().ok().flatten().is_none() {
std::thread::sleep(Duration::from_millis(200));
if self.child.try_wait().ok().flatten().is_none() {
let _ = self.child.kill();
}
}
let _ = self.child.wait();
}
}
async fn connect(socket: &Path, agent: &str, root: &Path) -> CommsClient {
let paths = CommsPaths {
comms_dir: socket.parent().expect("socket parent").to_path_buf(),
socket_path: socket.to_path_buf(),
};
CommsClient::connect(
&paths,
AgentId::parse(agent).expect("agent id"),
None,
Some(root.to_path_buf()),
)
.await
.unwrap_or_else(|e| panic!("connect {agent}: {e}"))
}
#[tokio::test(flavor = "multi_thread")]
async fn inbox_ack_advances_cursor_without_touching_shared_log_or_other_agents() {
let tmp = tempfile::tempdir().expect("tempdir");
let comms_dir = tmp.path().join("comms");
let root = tmp.path().to_path_buf();
let daemon = Daemon::start(&comms_dir);
let socket = daemon.socket().to_path_buf();
let room = RoomId::parse("team").expect("room");
let mut alice = connect(&socket, "agent-alice", &root).await;
alice
.create_room(room.clone(), RoomScope::Global, Some("Team".to_string()))
.await
.expect("create room");
let scope = vec!["src/**".to_string(), "docs/**".to_string()];
let m1 = alice
.post_message(
room.clone(),
"first".to_string(),
b"body one".to_vec(),
vec!["ops".to_string()],
None,
scope.clone(),
)
.await
.expect("post m1");
let _m2 = alice
.post_message(
room.clone(),
"second".to_string(),
b"body two".to_vec(),
vec![],
None,
vec![],
)
.await
.expect("post m2");
let mut bob = connect(&socket, "agent-bob", &root).await;
let mut carol = connect(&socket, "agent-carol", &root).await;
bob.join_room(room.clone()).await.expect("bob joins");
carol.join_room(room.clone()).await.expect("carol joins");
let (bob_inbox, _unread, _c) = bob
.read_inbox(None, None, None, 100, false)
.await
.expect("bob inbox");
assert_eq!(bob_inbox.len(), 2, "both messages are unread for Bob");
let first = bob_inbox
.iter()
.find(|sm| sm.meta.id == m1)
.expect("m1 in inbox");
assert!(first.meta.ts_micros > 0, "ts_micros surfaced");
assert_eq!(first.meta.tags, vec!["ops".to_string()], "tags surfaced");
assert_eq!(first.meta.scope, scope, "scope round-trips through post");
assert_eq!(first.seq, 1, "seq surfaced (first message in the room)");
let (acked, cursors) = bob
.ack_inbox(vec![m1.clone()], None, None)
.await
.expect("bob ack m1");
assert_eq!(acked, 1, "one id resolved + acked");
assert_eq!(
cursors,
vec![("team".to_string(), 1)],
"cursor advanced to seq 1"
);
let (bob_after, _u, _c) = bob
.read_inbox(None, None, None, 100, false)
.await
.expect("bob inbox after ack");
assert_eq!(bob_after.len(), 1, "acked message dropped from Bob's inbox");
assert_eq!(bob_after[0].meta.subject, "second");
let (history, _next) = bob
.read_history(room.clone(), None, 100)
.await
.expect("history");
assert_eq!(history.len(), 2, "ack must not delete from the shared log");
let (carol_inbox, _u, _c) = carol
.read_inbox(None, None, None, 100, false)
.await
.expect("carol inbox");
assert_eq!(carol_inbox.len(), 2, "another agent's inbox is untouched");
let (acked2, cursors2) = carol
.ack_inbox(vec![], Some(room.clone()), Some(2))
.await
.expect("carol bulk ack");
assert_eq!(acked2, 0, "bulk mode acks no specific ids");
assert_eq!(cursors2, vec![("team".to_string(), 2)]);
let (carol_after, _u, _c) = carol
.read_inbox(None, None, None, 100, false)
.await
.expect("carol inbox after bulk ack");
assert!(carol_after.is_empty(), "to_seq bulk-acked the whole room");
let err = bob.ack_inbox(vec![], None, None).await;
assert!(err.is_err(), "empty ack must be rejected");
}
#[tokio::test(flavor = "multi_thread")]
async fn client_recovers_when_daemon_dies_mid_session() {
let tmp = tempfile::tempdir().expect("tempdir");
let comms_dir = tmp.path().join("comms");
let root = tmp.path().to_path_buf();
let mut daemon = Daemon::start(&comms_dir);
let paths = CommsPaths {
comms_dir: comms_dir.clone(),
socket_path: comms_socket_path(&comms_dir),
};
let spawn_dir = comms_dir.clone();
let mut client = CommsClient::connect_with_respawn(
&paths,
AgentId::parse("agent-resilient").expect("agent id"),
None,
Some(root.clone()),
move |_paths: &CommsPaths| {
Command::new(BIN)
.args(["comms", "daemon"])
.env("BASEMIND_COMMS_DIR", &spawn_dir)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map(|_| ())
},
)
.await
.expect("connect with respawn");
let room = RoomId::parse("team").expect("room");
client
.create_room(room.clone(), RoomScope::Global, Some("Team".to_string()))
.await
.expect("create room");
let first = client
.post_message(
room.clone(),
"before".to_string(),
b"first".to_vec(),
vec![],
None,
vec![],
)
.await
.expect("post before death");
assert!(!first.is_empty(), "first post returns an id");
let _ = daemon.child.kill();
let _ = daemon.child.wait();
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline && probe_alive(&paths.socket_path) {
std::thread::sleep(Duration::from_millis(25));
}
assert!(
!probe_alive(&paths.socket_path),
"daemon must be dead before the recovery post"
);
let second = client
.post_message(
room.clone(),
"after".to_string(),
b"second".to_vec(),
vec![],
None,
vec![],
)
.await
.expect("post after death must transparently recover");
assert!(!second.is_empty(), "recovered post returns an id");
assert_ne!(first, second, "recovered post is a distinct message");
let (history, _next) = client
.read_history(room.clone(), None, 100)
.await
.expect("history after recovery");
assert_eq!(
history.len(),
2,
"both the pre-death and post-recovery messages are in the log"
);
let _ = Command::new(BIN)
.args(["comms", "stop"])
.env("BASEMIND_COMMS_DIR", &comms_dir)
.output();
}