use crate::io::{atomic::atomic_swap, error::InboxError, hash::compute_hash, lock::acquire_lock};
use crate::schema::InboxMessage;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq)]
pub enum WriteOutcome {
Success,
ConflictResolved { merged_messages: usize },
Queued { spool_path: PathBuf },
}
pub fn inbox_append(
inbox_path: &Path,
message: &InboxMessage,
team: &str,
agent: &str,
) -> Result<WriteOutcome, InboxError> {
let msg_clone = message.clone();
match atomic_write_with_conflict_check(inbox_path, |messages| {
if let Some(ref msg_id) = msg_clone.message_id
&& messages
.iter()
.any(|m| m.message_id.as_ref() == Some(msg_id))
{
return false;
}
messages.push(msg_clone);
true
}) {
Ok(outcome) => Ok(outcome),
Err(InboxError::LockTimeout { .. }) => {
let spool_path = crate::io::spool::spool_message(team, agent, message)?;
Ok(WriteOutcome::Queued { spool_path })
}
Err(e) => Err(e),
}
}
pub fn inbox_update<F>(
inbox_path: &Path,
_team: &str,
_agent: &str,
update_fn: F,
) -> Result<(), InboxError>
where
F: FnOnce(&mut Vec<InboxMessage>),
{
atomic_write_with_conflict_check(inbox_path, |messages| {
update_fn(messages);
true
})?;
Ok(())
}
fn atomic_write_with_conflict_check<F>(
inbox_path: &Path,
modify_fn: F,
) -> Result<WriteOutcome, InboxError>
where
F: FnOnce(&mut Vec<InboxMessage>) -> bool,
{
let lock_path = inbox_path.with_extension("lock");
let tmp_path = inbox_path.with_extension("tmp");
let _lock = acquire_lock(&lock_path, 5)?;
let (mut messages, original_hash) = if inbox_path.exists() {
let content = fs::read(inbox_path).map_err(|e| InboxError::Io {
path: inbox_path.to_path_buf(),
source: e,
})?;
let hash = compute_hash(&content);
let msgs: Vec<InboxMessage> =
serde_json::from_slice(&content).map_err(|e| InboxError::Json {
path: inbox_path.to_path_buf(),
source: e,
})?;
(msgs, hash)
} else {
(Vec::new(), compute_hash(b"[]"))
};
if !modify_fn(&mut messages) {
return Ok(WriteOutcome::Success);
}
let new_content =
serde_json::to_vec_pretty(&messages).map_err(|e| InboxError::Json {
path: tmp_path.clone(),
source: e,
})?;
{
let mut tmp_file = fs::File::create(&tmp_path).map_err(|e| InboxError::Io {
path: tmp_path.clone(),
source: e,
})?;
tmp_file
.write_all(&new_content)
.map_err(|e| InboxError::Io {
path: tmp_path.clone(),
source: e,
})?;
tmp_file.sync_all().map_err(|e| InboxError::Io {
path: tmp_path.clone(),
source: e,
})?;
}
if !inbox_path.exists() {
fs::rename(&tmp_path, inbox_path).map_err(|e| InboxError::Io {
path: inbox_path.to_path_buf(),
source: e,
})?;
return Ok(WriteOutcome::Success);
}
atomic_swap(inbox_path, &tmp_path)?;
let displaced_content = fs::read(&tmp_path).map_err(|e| InboxError::Io {
path: tmp_path.clone(),
source: e,
})?;
let displaced_hash = compute_hash(&displaced_content);
let outcome = if displaced_hash != original_hash {
let displaced_messages: Vec<InboxMessage> =
serde_json::from_slice(&displaced_content).map_err(|e| InboxError::Json {
path: tmp_path.clone(),
source: e,
})?;
let merged = merge_messages(&messages, &displaced_messages);
let merge_count = merged.len() - messages.len();
let merged_content =
serde_json::to_vec_pretty(&merged).map_err(|e| InboxError::Json {
path: tmp_path.clone(),
source: e,
})?;
fs::write(&tmp_path, &merged_content).map_err(|e| InboxError::Io {
path: tmp_path.clone(),
source: e,
})?;
atomic_swap(inbox_path, &tmp_path)?;
WriteOutcome::ConflictResolved {
merged_messages: merge_count,
}
} else {
WriteOutcome::Success
};
let _ = fs::remove_file(&tmp_path);
Ok(outcome)
}
pub fn inbox_read_merged(
team_dir: &Path,
agent_name: &str,
hostname_registry: Option<&crate::config::HostnameRegistry>,
) -> Result<Vec<InboxMessage>, InboxError> {
let inboxes_dir = team_dir.join("inboxes");
if !inboxes_dir.exists() {
return Ok(Vec::new());
}
let mut all_messages = Vec::new();
let mut seen_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
let entries = fs::read_dir(&inboxes_dir).map_err(|e| InboxError::Io {
path: inboxes_dir.clone(),
source: e,
})?;
for entry in entries {
let entry = entry.map_err(|e| InboxError::Io {
path: inboxes_dir.clone(),
source: e,
})?;
let path = entry.path();
if !path.is_file() {
continue;
}
let file_name = match path.file_name().and_then(|n| n.to_str()) {
Some(name) => name,
None => continue,
};
if !file_name.ends_with(".json") {
continue;
}
let is_match = if file_name == format!("{agent_name}.json") {
true
} else if let Some(stem) = file_name.strip_suffix(".json") {
if let Some(suffix) = stem.strip_prefix(&format!("{agent_name}.")) {
if let Some(registry) = hostname_registry {
registry.is_known_hostname(suffix)
} else {
false
}
} else {
false
}
} else {
false
};
if !is_match {
continue;
}
let content = fs::read(&path).map_err(|e| InboxError::Io {
path: path.clone(),
source: e,
})?;
let messages: Vec<InboxMessage> =
serde_json::from_slice(&content).map_err(|e| InboxError::Json {
path: path.clone(),
source: e,
})?;
for msg in messages {
if let Some(ref msg_id) = msg.message_id {
if seen_ids.contains(msg_id) {
continue; }
seen_ids.insert(msg_id.clone());
}
all_messages.push(msg);
}
}
all_messages.sort_by(|a, b| {
match a.timestamp.cmp(&b.timestamp) {
std::cmp::Ordering::Equal => {
match (&a.message_id, &b.message_id) {
(Some(id_a), Some(id_b)) => id_a.cmp(id_b),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
}
}
other => other,
}
});
Ok(all_messages)
}
fn merge_messages(
our_messages: &[InboxMessage],
their_messages: &[InboxMessage],
) -> Vec<InboxMessage> {
let mut merged = our_messages.to_vec();
let our_ids: std::collections::HashSet<_> = our_messages
.iter()
.filter_map(|m| m.message_id.as_ref())
.collect();
for msg in their_messages {
let already_present = if let Some(ref msg_id) = msg.message_id {
our_ids.contains(msg_id)
} else {
our_messages
.iter()
.any(|m| m.from == msg.from && m.text == msg.text && m.timestamp == msg.timestamp)
};
if !already_present {
merged.push(msg.clone());
}
}
merged.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
merged
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_message(from: &str, text: &str, message_id: Option<String>) -> InboxMessage {
InboxMessage {
from: from.to_string(),
text: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
read: false,
summary: None,
message_id,
unknown_fields: HashMap::new(),
}
}
#[test]
fn test_inbox_append_new_file() {
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let outcome = inbox_append(&inbox_path, &message, "test-team", "test-agent").unwrap();
assert_eq!(outcome, WriteOutcome::Success);
let content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].from, "team-lead");
assert_eq!(messages[0].text, "Test message");
}
#[test]
fn test_inbox_append_existing_file() {
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let msg1 = create_test_message("team-lead", "Message 1", Some("msg-001".to_string()));
inbox_append(&inbox_path, &msg1, "test-team", "test-agent").unwrap();
let msg2 = create_test_message("ci-agent", "Message 2", Some("msg-002".to_string()));
let outcome = inbox_append(&inbox_path, &msg2, "test-team", "test-agent").unwrap();
assert_eq!(outcome, WriteOutcome::Success);
let content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].text, "Message 1");
assert_eq!(messages[1].text, "Message 2");
}
#[test]
fn test_inbox_append_deduplication() {
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
inbox_append(&inbox_path, &message, "test-team", "test-agent").unwrap();
let outcome = inbox_append(&inbox_path, &message, "test-team", "test-agent").unwrap();
assert_eq!(outcome, WriteOutcome::Success);
let content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 1);
}
#[test]
fn test_merge_messages_no_duplicates() {
let msg1 = create_test_message("team-lead", "Message 1", Some("msg-001".to_string()));
let msg2 = create_test_message("ci-agent", "Message 2", Some("msg-002".to_string()));
let msg3 = create_test_message("qa-agent", "Message 3", Some("msg-003".to_string()));
let our_messages = vec![msg1.clone(), msg2.clone()];
let their_messages = vec![msg1.clone(), msg3.clone()];
let merged = merge_messages(&our_messages, &their_messages);
assert_eq!(merged.len(), 3);
assert!(merged.iter().any(|m| m.message_id == Some("msg-001".to_string())));
assert!(merged.iter().any(|m| m.message_id == Some("msg-002".to_string())));
assert!(merged.iter().any(|m| m.message_id == Some("msg-003".to_string())));
}
#[test]
fn test_merge_messages_preserves_order() {
let mut msg1 = create_test_message("team-lead", "Message 1", Some("msg-001".to_string()));
msg1.timestamp = "2026-02-11T10:00:00Z".to_string();
let mut msg2 = create_test_message("ci-agent", "Message 2", Some("msg-002".to_string()));
msg2.timestamp = "2026-02-11T11:00:00Z".to_string();
let mut msg3 = create_test_message("qa-agent", "Message 3", Some("msg-003".to_string()));
msg3.timestamp = "2026-02-11T10:30:00Z".to_string();
let our_messages = vec![msg1.clone(), msg2.clone()];
let their_messages = vec![msg3.clone()];
let merged = merge_messages(&our_messages, &their_messages);
assert_eq!(merged.len(), 3);
assert_eq!(merged[0].timestamp, "2026-02-11T10:00:00Z");
assert_eq!(merged[1].timestamp, "2026-02-11T10:30:00Z");
assert_eq!(merged[2].timestamp, "2026-02-11T11:00:00Z");
}
#[test]
fn test_merge_messages_without_message_id() {
let mut msg1 = create_test_message("team-lead", "Unique message", None);
msg1.timestamp = "2026-02-11T10:00:00Z".to_string();
let mut msg2 = create_test_message("team-lead", "Unique message", None);
msg2.timestamp = "2026-02-11T10:00:00Z".to_string();
let our_messages = vec![msg1.clone()];
let their_messages = vec![msg2.clone()];
let merged = merge_messages(&our_messages, &their_messages);
assert_eq!(merged.len(), 1);
}
#[test]
fn test_inbox_append_preserves_unknown_fields() {
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let json = r#"[{
"from": "team-lead",
"text": "Existing message",
"timestamp": "2026-02-11T10:00:00Z",
"read": false,
"unknownField": "should be preserved",
"futureFeature": {"nested": "data"}
}]"#;
fs::write(&inbox_path, json).unwrap();
let new_message = create_test_message("ci-agent", "New message", Some("msg-002".to_string()));
inbox_append(&inbox_path, &new_message, "test-team", "test-agent").unwrap();
let content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 2);
assert!(messages[0].unknown_fields.contains_key("unknownField"));
assert!(messages[0].unknown_fields.contains_key("futureFeature"));
}
#[test]
fn test_inbox_update_marks_read() {
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let msg1 = create_test_message("user-a", "Message 1", Some("msg-001".to_string()));
let msg2 = create_test_message("user-b", "Message 2", Some("msg-002".to_string()));
inbox_append(&inbox_path, &msg1, "test-team", "test-agent").unwrap();
inbox_append(&inbox_path, &msg2, "test-team", "test-agent").unwrap();
inbox_update(&inbox_path, "test-team", "test-agent", |messages| {
for msg in messages.iter_mut() {
msg.read = true;
}
})
.unwrap();
let content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 2);
assert!(messages[0].read);
assert!(messages[1].read);
}
#[test]
fn test_inbox_update_concurrent_writes() {
use std::sync::{Arc, Barrier};
use std::thread;
let temp_dir = TempDir::new().unwrap();
let inbox_path = temp_dir.path().join("agent.json");
let msg1 = create_test_message("user-a", "Message 1", Some("msg-001".to_string()));
let msg2 = create_test_message("user-b", "Message 2", Some("msg-002".to_string()));
inbox_append(&inbox_path, &msg1, "test-team", "test-agent").unwrap();
inbox_append(&inbox_path, &msg2, "test-team", "test-agent").unwrap();
let inbox_path = Arc::new(inbox_path);
let barrier = Arc::new(Barrier::new(2));
let path1 = Arc::clone(&inbox_path);
let barrier1 = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
barrier1.wait();
inbox_update(&path1, "test-team", "test-agent", |messages| {
for msg in messages.iter_mut() {
msg.read = true;
}
})
.unwrap();
});
let path2 = Arc::clone(&inbox_path);
let barrier2 = Arc::clone(&barrier);
let handle2 = thread::spawn(move || {
barrier2.wait();
let msg3 = create_test_message("user-c", "Message 3", Some("msg-003".to_string()));
inbox_append(&path2, &msg3, "test-team", "test-agent").unwrap();
});
handle1.join().unwrap();
handle2.join().unwrap();
let content = fs::read_to_string(&*inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&content).unwrap();
assert_eq!(messages.len(), 3, "No messages should be lost");
assert!(
messages.iter().any(|m| m.message_id == Some("msg-001".to_string())),
"msg-001 should be present"
);
assert!(
messages.iter().any(|m| m.message_id == Some("msg-002".to_string())),
"msg-002 should be present"
);
assert!(
messages.iter().any(|m| m.message_id == Some("msg-003".to_string())),
"msg-003 should be present"
);
}
#[test]
fn test_inbox_read_merged_empty_directory() {
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let messages = super::inbox_read_merged(team_dir, "agent-1", None).unwrap();
assert!(messages.is_empty());
}
#[test]
fn test_inbox_read_merged_local_only() {
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let inbox_path = inboxes_dir.join("agent-1.json");
let msg1 = create_test_message("user-a", "Local message 1", Some("msg-001".to_string()));
let msg2 = create_test_message("user-b", "Local message 2", Some("msg-002".to_string()));
let json = serde_json::to_string_pretty(&vec![msg1, msg2]).unwrap();
fs::write(&inbox_path, json).unwrap();
let messages = super::inbox_read_merged(team_dir, "agent-1", None).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].text, "Local message 1");
assert_eq!(messages[1].text, "Local message 2");
}
#[test]
fn test_inbox_read_merged_with_origin_files() {
use crate::config::{HostnameRegistry, RemoteConfig};
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let mut registry = HostnameRegistry::new();
registry
.register(RemoteConfig {
hostname: "remote1".to_string(),
address: "user@remote1".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
registry
.register(RemoteConfig {
hostname: "remote2".to_string(),
address: "user@remote2".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
let local_path = inboxes_dir.join("agent-1.json");
let mut msg1 = create_test_message("user-a", "Local message", Some("msg-001".to_string()));
msg1.timestamp = "2026-02-11T10:00:00Z".to_string();
fs::write(&local_path, serde_json::to_string_pretty(&vec![msg1]).unwrap()).unwrap();
let origin1_path = inboxes_dir.join("agent-1.remote1.json");
let mut msg2 = create_test_message("user-b", "Remote1 message", Some("msg-002".to_string()));
msg2.timestamp = "2026-02-11T10:05:00Z".to_string();
fs::write(&origin1_path, serde_json::to_string_pretty(&vec![msg2]).unwrap()).unwrap();
let origin2_path = inboxes_dir.join("agent-1.remote2.json");
let mut msg3 = create_test_message("user-c", "Remote2 message", Some("msg-003".to_string()));
msg3.timestamp = "2026-02-11T10:10:00Z".to_string();
fs::write(&origin2_path, serde_json::to_string_pretty(&vec![msg3]).unwrap()).unwrap();
let messages = super::inbox_read_merged(team_dir, "agent-1", Some(®istry)).unwrap();
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].text, "Local message");
assert_eq!(messages[1].text, "Remote1 message");
assert_eq!(messages[2].text, "Remote2 message");
}
#[test]
fn test_inbox_read_merged_deduplication() {
use crate::config::{HostnameRegistry, RemoteConfig};
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let mut registry = HostnameRegistry::new();
registry
.register(RemoteConfig {
hostname: "remote1".to_string(),
address: "user@remote1".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
let local_path = inboxes_dir.join("agent-1.json");
let msg1 = create_test_message("user-a", "First occurrence", Some("msg-001".to_string()));
fs::write(&local_path, serde_json::to_string_pretty(&vec![msg1]).unwrap()).unwrap();
let origin_path = inboxes_dir.join("agent-1.remote1.json");
let msg2 = create_test_message("user-b", "Duplicate (should be dropped)", Some("msg-001".to_string()));
fs::write(&origin_path, serde_json::to_string_pretty(&vec![msg2]).unwrap()).unwrap();
let messages = super::inbox_read_merged(team_dir, "agent-1", Some(®istry)).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].message_id, Some("msg-001".to_string()));
assert!(
messages[0].text == "First occurrence" || messages[0].text == "Duplicate (should be dropped)",
"Expected one of the duplicate messages, got: {}",
messages[0].text
);
}
#[test]
fn test_inbox_read_merged_no_message_id() {
use crate::config::{HostnameRegistry, RemoteConfig};
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let mut registry = HostnameRegistry::new();
registry
.register(RemoteConfig {
hostname: "remote1".to_string(),
address: "user@remote1".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
let local_path = inboxes_dir.join("agent-1.json");
let msg1 = create_test_message("user-a", "Message without ID", None);
fs::write(&local_path, serde_json::to_string_pretty(&vec![msg1]).unwrap()).unwrap();
let origin_path = inboxes_dir.join("agent-1.remote1.json");
let msg2 = create_test_message("user-b", "Another without ID", None);
fs::write(&origin_path, serde_json::to_string_pretty(&vec![msg2]).unwrap()).unwrap();
let messages = super::inbox_read_merged(team_dir, "agent-1", Some(®istry)).unwrap();
assert_eq!(messages.len(), 2);
}
#[test]
fn test_inbox_read_merged_agent_name_with_dots() {
use crate::config::{HostnameRegistry, RemoteConfig};
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let mut registry = HostnameRegistry::new();
registry
.register(RemoteConfig {
hostname: "mac-studio".to_string(),
address: "user@mac".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
let agent_name = "dev.agent";
let local_path = inboxes_dir.join(format!("{agent_name}.json"));
let msg1 = create_test_message("user-a", "Local", Some("msg-001".to_string()));
fs::write(&local_path, serde_json::to_string_pretty(&vec![msg1]).unwrap()).unwrap();
let origin_path = inboxes_dir.join(format!("{agent_name}.mac-studio.json"));
let msg2 = create_test_message("user-b", "Remote", Some("msg-002".to_string()));
fs::write(&origin_path, serde_json::to_string_pretty(&vec![msg2]).unwrap()).unwrap();
let messages = super::inbox_read_merged(team_dir, agent_name, Some(®istry)).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].text, "Local");
assert_eq!(messages[1].text, "Remote");
}
#[test]
fn test_inbox_read_merged_ignores_unknown_hostnames() {
use crate::config::{HostnameRegistry, RemoteConfig};
let temp_dir = TempDir::new().unwrap();
let team_dir = temp_dir.path();
let inboxes_dir = team_dir.join("inboxes");
fs::create_dir_all(&inboxes_dir).unwrap();
let mut registry = HostnameRegistry::new();
registry
.register(RemoteConfig {
hostname: "remote1".to_string(),
address: "user@remote1".to_string(),
ssh_key_path: None,
aliases: Vec::new(),
})
.unwrap();
let local_path = inboxes_dir.join("agent-1.json");
let msg1 = create_test_message("user-a", "Local", Some("msg-001".to_string()));
fs::write(&local_path, serde_json::to_string_pretty(&vec![msg1]).unwrap()).unwrap();
let origin1_path = inboxes_dir.join("agent-1.remote1.json");
let msg2 = create_test_message("user-b", "Remote1", Some("msg-002".to_string()));
fs::write(&origin1_path, serde_json::to_string_pretty(&vec![msg2]).unwrap()).unwrap();
let unknown_path = inboxes_dir.join("agent-1.unknown.json");
let msg3 = create_test_message("user-c", "Unknown (should be ignored)", Some("msg-003".to_string()));
fs::write(&unknown_path, serde_json::to_string_pretty(&vec![msg3]).unwrap()).unwrap();
let messages = super::inbox_read_merged(team_dir, "agent-1", Some(®istry)).unwrap();
assert_eq!(messages.len(), 2);
assert!(messages.iter().any(|m| m.text == "Local"));
assert!(messages.iter().any(|m| m.text == "Remote1"));
assert!(!messages.iter().any(|m| m.text == "Unknown (should be ignored)"));
}
}