use super::*;
use astrid_events::EventBus;
use astrid_events::ipc::IpcPayload;
fn publish_ipc_messages(bus: &EventBus, topic: &str, count: usize) {
for i in 0..count {
let msg = IpcMessage::new(
topic,
IpcPayload::Custom {
data: serde_json::json!({"i": i}),
},
uuid::Uuid::new_v4(),
);
bus.publish(AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
});
}
}
#[test]
fn drain_receiver_collects_all_available_messages() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("test.topic");
publish_ipc_messages(&bus, "test.topic", 5);
let result = drain_receiver(&mut receiver, 10 * 1024 * 1024);
assert_eq!(result.messages.len(), 5);
assert_eq!(result.dropped, 0);
assert_eq!(result.lagged, 0);
}
#[test]
fn drain_receiver_returns_empty_when_no_messages() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("test.topic");
let result = drain_receiver(&mut receiver, 10 * 1024 * 1024);
assert!(result.messages.is_empty());
assert_eq!(result.dropped, 0);
assert_eq!(result.lagged, 0);
}
#[test]
fn drain_receiver_drops_on_buffer_overflow() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("test.topic");
publish_ipc_messages(&bus, "test.topic", 10);
let result = drain_receiver(&mut receiver, 50);
assert!(
result.messages.len() < 10,
"should not have drained all 10 with 50-byte limit, got {}",
result.messages.len()
);
assert_eq!(
result.dropped, 1,
"one message should be dropped on overflow"
);
}
#[test]
fn drain_receiver_surfaces_lag_from_broadcast_overflow() {
let bus = EventBus::with_capacity(2);
let mut receiver = bus.subscribe_topic("test.topic");
publish_ipc_messages(&bus, "test.topic", 5);
let result = drain_receiver(&mut receiver, 10 * 1024 * 1024);
assert!(
result.lagged > 0,
"expected lag from broadcast overflow, got 0"
);
assert!(
!result.messages.is_empty(),
"should still get some messages after lag"
);
}
#[test]
fn serialize_envelope_produces_valid_json_with_all_fields() {
let msg = IpcMessage::new(
"test.topic",
IpcPayload::Custom {
data: serde_json::json!({"hello": "world"}),
},
uuid::Uuid::new_v4(),
);
let result = DrainResult {
messages: vec![msg],
dropped: 2,
lagged: 5,
};
let json = serialize_envelope(&result).expect("serialization should succeed");
let parsed: serde_json::Value = serde_json::from_str(&json).expect("should be valid JSON");
assert_eq!(
parsed["messages"]
.as_array()
.expect("messages is array")
.len(),
1
);
assert_eq!(parsed["dropped"], 2);
assert_eq!(parsed["lagged"], 5);
}
#[test]
fn serialize_envelope_empty_messages() {
let result = DrainResult {
messages: vec![],
dropped: 0,
lagged: 0,
};
let json = serialize_envelope(&result).expect("serialization should succeed");
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(parsed["messages"].as_array().unwrap().is_empty());
assert_eq!(parsed["dropped"], 0);
assert_eq!(parsed["lagged"], 0);
}
#[tokio::test]
async fn subscription_lifecycle_remove_and_reinsert() {
let bus = EventBus::new();
let receiver = bus.subscribe_topic("test.*");
let mut subscriptions = std::collections::HashMap::new();
let handle_id: u64 = 42;
subscriptions.insert(handle_id, receiver);
let mut removed = subscriptions
.remove(&handle_id)
.expect("should find handle");
assert!(
!subscriptions.contains_key(&handle_id),
"handle should be gone after remove"
);
publish_ipc_messages(&bus, "test.foo", 3);
let result = drain_receiver(&mut removed, 10 * 1024 * 1024);
assert_eq!(result.messages.len(), 3, "receiver should work outside map");
subscriptions.insert(handle_id, removed);
assert!(
subscriptions.contains_key(&handle_id),
"handle should be back after re-insert"
);
publish_ipc_messages(&bus, "test.bar", 2);
let receiver = subscriptions.get_mut(&handle_id).unwrap();
let result = drain_receiver(receiver, 10 * 1024 * 1024);
assert_eq!(
result.messages.len(),
2,
"receiver should work after re-insert"
);
}
#[tokio::test]
async fn recv_blocking_wake_plus_drain_burst() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("burst.*");
let bus_clone = bus.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
publish_ipc_messages(&bus_clone, "burst.test", 5);
});
let first = tokio::time::timeout(std::time::Duration::from_secs(5), receiver.recv())
.await
.expect("should not timeout")
.expect("should get a message");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let drain = drain_receiver(&mut receiver, 10 * 1024 * 1024);
let mut total = drain.messages.len() + 1; if let AstridEvent::Ipc { .. } = &*first {
} else {
total -= 1;
}
assert_eq!(
total, 5,
"should get all 5 messages (1 from recv + rest from drain)"
);
}
fn deserialize_publish_payload(payload_bytes: &[u8]) -> Result<IpcPayload, String> {
serde_json::from_slice::<serde_json::Value>(payload_bytes)
.map(IpcPayload::from_json_value)
.map_err(|_| "IPC payload is not valid JSON".into())
}
#[test]
fn publish_unknown_type_tag_produces_custom() {
let input = serde_json::json!({"type": "my_plugin_msg", "foo": 1});
let bytes = serde_json::to_vec(&input).unwrap();
let payload = deserialize_publish_payload(&bytes).unwrap();
match payload {
IpcPayload::Custom { data } => {
assert_eq!(data["type"], "my_plugin_msg");
assert_eq!(data["foo"], 1);
},
other => panic!("expected Custom, got {other:?}"),
}
}
#[test]
fn publish_missing_type_tag_produces_custom() {
let input = serde_json::json!({"foo": 1, "bar": "baz"});
let bytes = serde_json::to_vec(&input).unwrap();
let payload = deserialize_publish_payload(&bytes).unwrap();
match payload {
IpcPayload::Custom { data } => {
assert_eq!(data["foo"], 1);
},
other => panic!("expected Custom, got {other:?}"),
}
}
#[test]
fn publish_non_object_payload_produces_custom() {
let bytes = br#""hello""#;
let payload = deserialize_publish_payload(bytes).unwrap();
match payload {
IpcPayload::Custom { data } => {
assert_eq!(data, "hello");
},
other => panic!("expected Custom, got {other:?}"),
}
}
#[test]
fn publish_known_type_tag_deserializes_correctly() {
let input = serde_json::json!({"type": "connect"});
let bytes = serde_json::to_vec(&input).unwrap();
let payload = deserialize_publish_payload(&bytes).unwrap();
assert_eq!(payload, IpcPayload::Connect);
}
#[test]
fn publish_known_tag_with_malformed_fields_falls_to_custom() {
let input = serde_json::json!({"type": "user_input"});
let bytes = serde_json::to_vec(&input).unwrap();
let payload = deserialize_publish_payload(&bytes).unwrap();
match payload {
IpcPayload::Custom { data } => {
assert_eq!(data["type"], "user_input");
},
other => panic!("expected Custom (malformed known tag), got {other:?}"),
}
}
#[test]
fn publish_custom_tag_with_data_unwraps_inner_value() {
let input = serde_json::json!({"type": "custom", "data": {"foo": 1}});
let bytes = serde_json::to_vec(&input).unwrap();
let payload = deserialize_publish_payload(&bytes).unwrap();
match payload {
IpcPayload::Custom { data } => {
assert_eq!(data, serde_json::json!({"foo": 1}));
},
other => panic!("expected Custom with inner data, got {other:?}"),
}
}
#[test]
fn publish_invalid_json_is_error() {
let result = deserialize_publish_payload(b"not json at all");
assert!(result.is_err());
}
#[tokio::test]
async fn cancellation_token_unblocks_recv() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("cancel.test");
let cancel_token = tokio_util::sync::CancellationToken::new();
let token = cancel_token.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
token.cancel();
});
let start = std::time::Instant::now();
let event = tokio::select! {
result = tokio::time::timeout(
std::time::Duration::from_millis(60_000),
receiver.recv(),
) => result.ok().flatten(),
() = cancel_token.cancelled() => None,
};
let elapsed = start.elapsed();
assert!(event.is_none(), "should return None on cancellation");
assert!(
elapsed < std::time::Duration::from_millis(500),
"should unblock promptly, took {elapsed:?}"
);
}
#[test]
fn protected_interceptor_handle_rejects_unsubscribe() {
let bus = EventBus::new();
let receiver = bus.subscribe_topic("interceptor.topic");
let mut subscriptions = std::collections::HashMap::new();
let handle_id: u64 = 1;
subscriptions.insert(handle_id, receiver);
let result = remove_subscription(&mut subscriptions, true, handle_id);
assert!(
result.is_err(),
"should reject unsubscribe on protected handle",
);
assert!(
result
.unwrap_err()
.to_string()
.contains("runtime-owned interceptor handle"),
"error should mention interceptor handle",
);
assert!(
subscriptions.contains_key(&handle_id),
"protected subscription must survive unsubscribe attempt",
);
}
#[test]
fn guest_handle_allows_unsubscribe() {
let bus = EventBus::new();
let receiver = bus.subscribe_topic("guest.topic");
let mut subscriptions = std::collections::HashMap::new();
let handle_id: u64 = 99;
subscriptions.insert(handle_id, receiver);
let result = remove_subscription(&mut subscriptions, false, handle_id);
assert!(result.is_ok(), "should allow unsubscribe on guest handle");
assert!(
!subscriptions.contains_key(&handle_id),
"subscription should be gone after removal",
);
}
#[test]
fn unsubscribe_nonexistent_handle_returns_not_found() {
let mut subscriptions = std::collections::HashMap::new();
let result = remove_subscription(&mut subscriptions, false, 42);
assert!(result.is_err(), "should reject nonexistent handle");
assert!(
result.unwrap_err().to_string().contains("not found"),
"error should mention not found",
);
}
#[test]
fn subscribe_acl_empty_patterns_denies() {
let err = check_subscribe_acl("test-capsule", "any.topic", &[]).unwrap_err();
assert!(err.contains("no ipc_subscribe declarations"));
}
#[test]
fn subscribe_acl_exact_match_allows() {
let patterns = vec!["agent.v1.response".into()];
assert!(check_subscribe_acl("test", "agent.v1.response", &patterns).is_ok());
}
#[test]
fn subscribe_acl_wildcard_matches_concrete() {
let patterns = vec!["registry.v1.*".into()];
assert!(check_subscribe_acl("test", "registry.v1.providers", &patterns).is_ok());
}
#[test]
fn subscribe_acl_segment_count_mismatch_denies() {
let patterns = vec!["registry.v1.*".into()];
let err = check_subscribe_acl("test", "registry.v1.selection.callback", &patterns).unwrap_err();
assert!(err.contains("not allowed to subscribe"));
}
#[test]
fn subscribe_acl_wildcard_subscription_matches_wildcard_acl() {
let patterns = vec!["foo.*".into()];
assert!(check_subscribe_acl("test", "foo.*", &patterns).is_ok());
}
#[test]
fn subscribe_acl_wildcard_subscription_denied_by_exact_acl() {
let patterns = vec!["foo.bar".into()];
let result = check_subscribe_acl("test", "foo.*", &patterns);
assert!(
result.is_err(),
"wildcard subscription must be denied by exact ACL"
);
}
#[test]
fn subscribe_acl_unrelated_topic_denies() {
let patterns = vec!["agent.v1.*".into()];
let err = check_subscribe_acl("test", "session.v1.clear", &patterns).unwrap_err();
assert!(err.contains("not allowed to subscribe"));
}
#[test]
fn subscribe_acl_multiple_patterns_second_matches() {
let patterns = vec!["agent.v1.*".into(), "session.v1.response.*".into()];
assert!(check_subscribe_acl("test", "session.v1.response.abc", &patterns).is_ok());
}
#[test]
fn subscribe_acl_malformed_pattern_silently_denies() {
let patterns = vec!["foo..bar".into()];
let result = check_subscribe_acl("test", "foo.x.bar", &patterns);
assert!(
result.is_err(),
"malformed ACL pattern must not allow subscriptions"
);
}