use super::*;
fn temp_broker() -> (tempfile::TempDir, Arc<Broker>) {
let dir = tempfile::tempdir().expect("tempdir");
let store = Arc::new(CommsStore::open(dir.path()).expect("store"));
(dir, Arc::new(Broker::new(store)))
}
fn agent(s: &str) -> AgentId {
AgentId::parse(s).expect("agent")
}
#[tokio::test]
async fn hello_rejects_proto_skew() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(8);
let mut session = Session::default();
let resp = broker
.handle(
CommsRequest::Hello {
agent: agent("a"),
proto_ver: PROTO_VER + 1,
remote: None,
cwd: None,
},
&mut session,
&tx,
)
.await;
assert!(matches!(resp, CommsResponse::Error { code, .. } if code == "proto_skew"));
}
#[tokio::test]
async fn post_requires_hello() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(8);
let mut session = Session::default();
let resp = broker
.handle(
CommsRequest::Post {
room: RoomId::parse("r").expect("r"),
subject: "s".to_string(),
tags: vec![],
reply_to: None,
scope: vec![],
body: b"b".to_vec(),
},
&mut session,
&tx,
)
.await;
assert!(matches!(resp, CommsResponse::Error { code, .. } if code == "no_hello"));
}
#[tokio::test]
async fn subscribe_then_post_fans_out_notification() {
let (_d, broker) = temp_broker();
let (tx, mut rx) = mpsc::channel(8);
let mut session = Session::default();
broker
.handle(
CommsRequest::Hello {
agent: agent("a"),
proto_ver: PROTO_VER,
remote: None,
cwd: None,
},
&mut session,
&tx,
)
.await;
let room = RoomId::parse("r").expect("r");
broker
.handle(
CommsRequest::CreateRoom {
room: room.clone(),
scope: RoomScope::Global,
title: None,
},
&mut session,
&tx,
)
.await;
let sub_resp = broker
.handle(
CommsRequest::Subscribe { room: room.clone() },
&mut session,
&tx,
)
.await;
assert!(matches!(sub_resp, CommsResponse::Subscribed { .. }));
assert_eq!(broker.subscriber_count(), 1);
let posted = broker
.handle(
CommsRequest::Post {
room: room.clone(),
subject: "hi".to_string(),
tags: vec![],
reply_to: None,
scope: vec![],
body: b"hello".to_vec(),
},
&mut session,
&tx,
)
.await;
assert!(matches!(posted, CommsResponse::Posted { .. }));
let note = rx.recv().await.expect("notification");
match note {
CommsOut::Notification(CommsNotification::Message(meta)) => {
assert_eq!(meta.subject, "hi");
assert_eq!(meta.room, room);
}
other => panic!("expected a Message notification, got {other:?}"),
}
}
#[test]
fn sanitize_id_maps_to_alphabet() {
assert_eq!(sanitize_id("github.com/foo/bar"), "github.com-foo-bar");
assert!(RoomId::parse(sanitize_id("a b!c")).is_ok());
}
async fn hello_join(
broker: &Broker,
tx: &mpsc::Sender<CommsOut>,
who: &str,
room: &RoomId,
) -> Session {
let mut session = Session::default();
broker
.handle(
CommsRequest::Hello {
agent: agent(who),
proto_ver: PROTO_VER,
remote: None,
cwd: None,
},
&mut session,
tx,
)
.await;
broker
.handle(
CommsRequest::CreateRoom {
room: room.clone(),
scope: RoomScope::Global,
title: None,
},
&mut session,
tx,
)
.await;
broker
.handle(CommsRequest::Join { room: room.clone() }, &mut session, tx)
.await;
session
}
async fn post(
broker: &Broker,
session: &mut Session,
tx: &mpsc::Sender<CommsOut>,
room: &RoomId,
subject: &str,
) -> String {
match broker
.handle(
CommsRequest::Post {
room: room.clone(),
subject: subject.to_string(),
tags: vec![],
reply_to: None,
scope: vec![],
body: subject.as_bytes().to_vec(),
},
session,
tx,
)
.await
{
CommsResponse::Posted { message_id } => message_id,
other => panic!("expected Posted, got {other:?}"),
}
}
async fn inbox(
broker: &Broker,
session: &mut Session,
tx: &mpsc::Sender<CommsOut>,
) -> Vec<SeqMeta> {
match broker
.handle(
CommsRequest::Inbox {
remote: None,
cwd: None,
cursor: None,
limit: None,
mark_read: false,
},
session,
tx,
)
.await
{
CommsResponse::Inbox { messages, .. } => messages,
other => panic!("expected Inbox, got {other:?}"),
}
}
#[tokio::test]
async fn ack_by_ids_advances_only_the_acking_agents_cursor() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(64);
let room = RoomId::parse("r").expect("r");
let mut alice = hello_join(&broker, &tx, "alice", &room).await;
let mut bob = hello_join(&broker, &tx, "bob", &room).await;
let mut carol = hello_join(&broker, &tx, "carol", &room).await;
let m1 = post(&broker, &mut alice, &tx, &room, "first").await;
let _m2 = post(&broker, &mut alice, &tx, &room, "second").await;
assert_eq!(inbox(&broker, &mut bob, &tx).await.len(), 2);
let resp = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![m1.clone()],
room: None,
to_seq: None,
},
&mut bob,
&tx,
)
.await;
match resp {
CommsResponse::Acked {
acked,
cursors_advanced,
} => {
assert_eq!(acked, 1);
assert_eq!(cursors_advanced, vec![("r".to_string(), 1)]);
}
other => panic!("expected Acked, got {other:?}"),
}
let bob_after = inbox(&broker, &mut bob, &tx).await;
assert_eq!(bob_after.len(), 1);
assert_eq!(bob_after[0].meta.subject, "second");
match broker
.handle(
CommsRequest::History {
room: room.clone(),
cursor: None,
limit: None,
},
&mut bob,
&tx,
)
.await
{
CommsResponse::History { messages, .. } => assert_eq!(messages.len(), 2),
other => panic!("expected History, got {other:?}"),
}
assert_eq!(inbox(&broker, &mut carol, &tx).await.len(), 2);
}
#[tokio::test]
async fn ack_to_seq_bulk_clears_room() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(64);
let room = RoomId::parse("r").expect("r");
let mut alice = hello_join(&broker, &tx, "alice", &room).await;
let mut bob = hello_join(&broker, &tx, "bob", &room).await;
for i in 0..3 {
post(&broker, &mut alice, &tx, &room, &format!("m{i}")).await;
}
assert_eq!(inbox(&broker, &mut bob, &tx).await.len(), 3);
let resp = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![],
room: Some(room.clone()),
to_seq: Some(3),
},
&mut bob,
&tx,
)
.await;
assert!(matches!(resp, CommsResponse::Acked { acked: 0, .. }));
assert!(inbox(&broker, &mut bob, &tx).await.is_empty());
}
#[tokio::test]
async fn ack_does_not_report_phantom_advance() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(64);
let room = RoomId::parse("r").expect("r");
let mut alice = hello_join(&broker, &tx, "alice", &room).await;
let mut bob = hello_join(&broker, &tx, "bob", &room).await;
post(&broker, &mut alice, &tx, &room, "m0").await;
let resp = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![],
room: Some(room.clone()),
to_seq: Some(0),
},
&mut bob,
&tx,
)
.await;
match resp {
CommsResponse::Acked {
acked,
cursors_advanced,
} => {
assert_eq!(acked, 0);
assert!(
cursors_advanced.is_empty(),
"to_seq=0 must not report a phantom advance"
);
}
other => panic!("expected Acked, got {other:?}"),
}
let _ = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![],
room: Some(room.clone()),
to_seq: Some(1),
},
&mut bob,
&tx,
)
.await;
let resp2 = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![],
room: Some(room.clone()),
to_seq: Some(1),
},
&mut bob,
&tx,
)
.await;
match resp2 {
CommsResponse::Acked {
cursors_advanced, ..
} => assert!(
cursors_advanced.is_empty(),
"re-acking an already-acked seq must not report an advance"
),
other => panic!("expected Acked, got {other:?}"),
}
}
#[tokio::test]
async fn ack_with_no_input_is_rejected() {
let (_d, broker) = temp_broker();
let (tx, _rx) = mpsc::channel(8);
let room = RoomId::parse("r").expect("r");
let mut bob = hello_join(&broker, &tx, "bob", &room).await;
let resp = broker
.handle(
CommsRequest::AckInbox {
message_ids: vec![],
room: None,
to_seq: None,
},
&mut bob,
&tx,
)
.await;
assert!(matches!(resp, CommsResponse::Error { code, .. } if code == "empty_ack"));
}