#[derive(Clone)]
struct SwitchableFailStorage {
inner: nostr_double_ratchet_runtime::InMemoryStorage,
fail_puts: Arc<std::sync::atomic::AtomicBool>,
}
impl SwitchableFailStorage {
fn new() -> Self {
Self {
inner: nostr_double_ratchet_runtime::InMemoryStorage::new(),
fail_puts: Arc::new(std::sync::atomic::AtomicBool::new(false)),
}
}
fn set_fail_puts(&self, fail: bool) {
self.fail_puts
.store(fail, std::sync::atomic::Ordering::SeqCst);
}
}
impl StorageAdapter for SwitchableFailStorage {
fn get(&self, key: &str) -> nostr_double_ratchet_runtime::Result<Option<String>> {
self.inner.get(key)
}
fn put(&self, key: &str, value: String) -> nostr_double_ratchet_runtime::Result<()> {
if self.fail_puts.load(std::sync::atomic::Ordering::SeqCst) {
return Err(nostr_double_ratchet_runtime::Error::Storage(
"injected storage failure".to_string(),
));
}
self.inner.put(key, value)
}
fn del(&self, key: &str) -> nostr_double_ratchet_runtime::Result<()> {
self.inner.del(key)
}
fn list(&self, prefix: &str) -> nostr_double_ratchet_runtime::Result<Vec<String>> {
self.inner.list(prefix)
}
}
fn protocol_plan_for_test(
message_authors: Vec<PublicKey>,
group_sender_key_authors: Vec<PublicKey>,
) -> ProtocolSubscriptionPlan {
ProtocolSubscriptionPlan {
runtime_subscriptions: vec!["ndr-protocol".to_string()],
roster_authors: Vec::new(),
invite_authors: Vec::new(),
message_authors: message_authors
.into_iter()
.map(|pubkey| pubkey.to_hex())
.collect(),
group_sender_key_authors: group_sender_key_authors
.into_iter()
.map(|pubkey| pubkey.to_hex())
.collect(),
invite_response_recipient: None,
}
}
fn runtime_rumor_json(
author: PublicKey,
kind: u32,
content: &str,
created_at_secs: u64,
tags: Vec<Vec<String>>,
) -> (String, String) {
let tags = tags
.into_iter()
.map(|tag| nostr::Tag::parse(tag).expect("runtime rumor tag"))
.collect::<Vec<_>>();
let mut rumor = UnsignedEvent::new(
author,
Timestamp::from_secs(created_at_secs),
Kind::Custom(kind as u16),
tags,
content.to_string(),
);
rumor.ensure_id();
let id = rumor.id.as_ref().expect("runtime rumor id").to_string();
(
serde_json::to_string(&rumor).expect("runtime rumor json"),
id,
)
}
fn appcore_direct_message_event_for_test(
receiver_engine: &mut ProtocolEngine,
sender_keys: &Keys,
body: &str,
created_at_secs: u64,
) -> Event {
let invite = receiver_engine
.local_invite_for_test()
.expect("receiver local invite");
let (mut sender_session, response) = invite
.accept_with_owner(
sender_keys.public_key(),
sender_keys.secret_key().to_secret_bytes(),
Some(sender_keys.public_key().to_hex()),
Some(sender_keys.public_key()),
)
.expect("sender accepts receiver invite");
let response_event = invite_response_event(&response).expect("invite response event");
receiver_engine
.observe_invite_response_event(&response_event)
.expect("receiver observes invite response");
let (content, _) = runtime_rumor_json(
sender_keys.public_key(),
CHAT_MESSAGE_KIND,
body,
created_at_secs,
Vec::new(),
);
let plan = sender_session
.plan_send(content.as_bytes(), NdrUnixSeconds(created_at_secs))
.expect("sender plans message");
let sent = sender_session.apply_send(plan);
message_event(&sent.envelope).expect("message event")
}
#[test]
fn restoring_invalid_secret_key_shows_normie_error() {
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = AppCore::new(
flume::unbounded().0,
flume::unbounded().0,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.handle_action(AppAction::RestoreSession {
owner_nsec: "not a secret key".to_string(),
});
assert_eq!(core.state.toast.as_deref(), Some("Invalid key."));
assert!(!core.state.busy.restoring_session);
}
#[test]
fn removing_last_message_server_leaves_empty_list() {
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = AppCore::new(
flume::unbounded().0,
flume::unbounded().0,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.preferences.nostr_relay_urls = vec![
"wss://relay-one.example".to_string(),
"wss://relay-two.example".to_string(),
];
core.rebuild_state();
core.handle_action(AppAction::RemoveNostrRelay {
relay_url: "wss://relay-one.example".to_string(),
});
core.handle_action(AppAction::RemoveNostrRelay {
relay_url: "wss://relay-two.example".to_string(),
});
assert!(core.preferences.nostr_relay_urls.is_empty());
assert!(core.state.preferences.nostr_relay_urls.is_empty());
assert_eq!(core.state.toast, None);
}
#[test]
fn direct_message_with_no_relays_is_queued_locally() {
let owner = Keys::generate();
let peer = Keys::generate();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = AppCore::new(
flume::unbounded().0,
flume::unbounded().0,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.preferences.nostr_relay_urls.clear();
core.start_primary_session(owner.clone(), owner, false, false)
.expect("primary session");
core.handle_action(AppAction::CreateChat {
peer_input: peer.public_key().to_hex(),
});
let chat_id = core
.state
.current_chat
.as_ref()
.expect("created chat")
.chat_id
.clone();
core.handle_action(AppAction::SendMessage {
chat_id,
text: "queued offline".to_string(),
});
let current = core.state.current_chat.as_ref().expect("current chat");
assert_eq!(core.state.toast, None);
assert!(current.messages.iter().any(|message| {
message.body == "queued offline"
&& message.is_outgoing
&& message.delivery == DeliveryState::Queued
}));
}
#[test]
fn queued_runtime_publish_retries_when_message_servers_return() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let relay = crate::local_relay::TestRelay::start();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let (core_tx, core_rx) = flume::unbounded();
let mut core = logged_in_test_core_at_data_dir(
&owner,
&device,
temp_dir.path().to_string_lossy().to_string(),
);
core.core_sender = core_tx;
let chat_id = peer.public_key().to_hex();
let message_id = "retry-message".to_string();
core.push_outgoing_message_with_id(
message_id.clone(),
&chat_id,
"offline relay retry".to_string(),
1_777_159_500,
None,
DeliveryState::Pending,
);
core.logged_in
.as_mut()
.expect("logged in")
.relay_urls
.clear();
let event = EventBuilder::new(Kind::from(MESSAGE_EVENT_KIND as u16), "retry body")
.sign_with_keys(&device)
.expect("event");
let event_id = event.id.to_string();
core.publish_runtime_event(
event,
"runtime",
Some((message_id.clone(), chat_id.clone())),
);
assert!(core.pending_relay_publishes.contains_key(&event_id));
assert_eq!(
core.threads
.get(&chat_id)
.and_then(|thread| thread
.messages
.iter()
.find(|message| message.id == message_id))
.map(|message| &message.delivery),
Some(&DeliveryState::Queued)
);
let relay_urls = relay_urls_from_strings(&[relay.url().to_string()]);
{
let logged_in = core.logged_in.as_mut().expect("logged in");
logged_in.relay_urls = relay_urls.clone();
let client = logged_in.client.clone();
core.runtime
.block_on(ensure_session_relays_configured(&client, &relay_urls));
}
core.retry_pending_relay_publishes("test");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
while std::time::Instant::now() < deadline {
while let Ok(msg) = core_rx.try_recv() {
core.handle_message(msg);
}
if relay.events().iter().any(|event| {
event.get("id").and_then(|value| value.as_str()) == Some(event_id.as_str())
}) && !core.pending_relay_publishes.contains_key(&event_id)
{
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
assert!(
relay.events().iter().any(|event| {
event.get("id").and_then(|value| value.as_str()) == Some(event_id.as_str())
}),
"retry should publish the queued event to the relay"
);
assert!(!core.pending_relay_publishes.contains_key(&event_id));
assert_eq!(
core.threads
.get(&chat_id)
.and_then(|thread| thread
.messages
.iter()
.find(|message| message.id == message_id))
.map(|message| &message.delivery),
Some(&DeliveryState::Sent)
);
}
#[test]
fn staged_first_contact_queues_payload_durably_before_delayed_publish() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let mut core = logged_in_test_core("staged-first-contact-queue", &owner, &device);
let chat_id = peer.public_key().to_hex();
let message_id = "first-contact-message".to_string();
core.push_outgoing_message_with_id(
message_id.clone(),
&chat_id,
"staged".to_string(),
unix_now().get(),
None,
DeliveryState::Pending,
);
let bootstrap = EventBuilder::new(Kind::from(INVITE_RESPONSE_KIND as u16), "bootstrap")
.sign_with_keys(&device)
.expect("bootstrap event");
let payload = EventBuilder::new(Kind::from(MESSAGE_EVENT_KIND as u16), "payload")
.sign_with_keys(&device)
.expect("payload event");
let payload_id = payload.id.to_string();
let completions = BTreeMap::from([(payload_id.clone(), (message_id.clone(), chat_id.clone()))]);
core.process_protocol_engine_effects_with_completions(
vec![ProtocolEffect::PublishStagedFirstContact {
bootstrap: vec![ProtocolPublishEvent {
event: bootstrap,
inner_event_id: None,
target_owner_pubkey_hex: None,
target_device_id: None,
}],
payload: vec![ProtocolPublishEvent {
event: payload,
inner_event_id: Some(message_id.clone()),
target_owner_pubkey_hex: Some(peer.public_key().to_hex()),
target_device_id: Some(peer.public_key().to_hex()),
}],
}],
&completions,
);
let pending = core
.pending_relay_publishes
.get(&payload_id)
.expect("payload should be queued before delayed publish");
assert_eq!(pending.label, "appcore-protocol-first-contact");
assert_eq!(pending.message_id.as_deref(), Some(message_id.as_str()));
assert_eq!(pending.chat_id.as_deref(), Some(chat_id.as_str()));
assert!(
!core.pending_relay_publish_inflight.contains(&payload_id),
"payload should be durable but not in flight until the first-contact delay fires"
);
}
#[test]
fn liveness_retries_pending_relay_publish_without_active_protocol_subscription() {
let owner = Keys::generate();
let device = Keys::generate();
let relay = crate::local_relay::TestRelay::start();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let (core_tx, core_rx) = flume::unbounded();
let mut core = logged_in_test_core_at_data_dir(
&owner,
&device,
temp_dir.path().to_string_lossy().to_string(),
);
core.core_sender = core_tx;
let relay_urls = relay_urls_from_strings(&[relay.url().to_string()]);
{
let logged_in = core.logged_in.as_mut().expect("logged in");
logged_in.relay_urls = relay_urls;
}
assert!(
core.protocol_subscription_runtime.desired_plan.is_none(),
"test should cover pending publish retry without subscription state"
);
let event = EventBuilder::new(Kind::from(MESSAGE_EVENT_KIND as u16), "retry body")
.sign_with_keys(&device)
.expect("event");
let event_id = event.id.to_string();
core.pending_relay_publishes.insert(
event_id.clone(),
PendingRelayPublish {
owner_pubkey_hex: owner.public_key().to_hex(),
event_id: event_id.clone(),
label: "app-keys".to_string(),
event_json: serde_json::to_string(&event).expect("event json"),
inner_event_id: None,
target_owner_pubkey_hex: None,
target_device_id: None,
message_id: None,
chat_id: None,
created_at_secs: event.created_at.as_secs(),
attempt_count: 0,
last_error: Some("initial offline publish failed".to_string()),
},
);
core.handle_protocol_subscription_liveness_check(core.protocol_reconnect_token);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
while std::time::Instant::now() < deadline {
while let Ok(msg) = core_rx.try_recv() {
core.handle_message(msg);
}
if relay.events().iter().any(|event| {
event.get("id").and_then(|value| value.as_str()) == Some(event_id.as_str())
}) && !core.pending_relay_publishes.contains_key(&event_id)
{
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
assert!(
relay.events().iter().any(|event| {
event.get("id").and_then(|value| value.as_str()) == Some(event_id.as_str())
}),
"liveness must retry queued relay publishes even without active protocol subscriptions"
);
assert!(!core.pending_relay_publishes.contains_key(&event_id));
}
#[test]
fn fetch_catch_up_events_requeues_large_batches_in_chunks() {
let keys = Keys::generate();
let events = (0..=super::lifecycle::CATCH_UP_EVENT_PROCESS_CHUNK_SIZE)
.map(|index| {
EventBuilder::new(Kind::TextNote, format!("catch-up-{index}"))
.sign_with_keys(&keys)
.expect("event")
})
.collect::<Vec<_>>();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let (core_tx, core_rx) = flume::unbounded();
let mut core = AppCore::new(
flume::unbounded().0,
core_tx,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.handle_internal(InternalEvent::FetchCatchUpEvents(events));
let requeued = core_rx.try_recv().expect("requeued remainder");
match requeued {
CoreMsg::Internal(event) => match *event {
InternalEvent::FetchCatchUpEvents(remainder) => {
assert_eq!(remainder.len(), 1);
}
other => panic!("unexpected internal event: {other:?}"),
},
other => panic!("unexpected core message: {other:?}"),
}
}
#[test]
fn failed_publish_drain_batches_results_and_schedules_one_retry() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("publish-drain-fail-batch", &owner, &device);
core.relay_transport_runtime.publish_drain_token = 7;
core.relay_transport_runtime.publish_drain_in_flight = true;
core.relay_transport_runtime.publish_drain_dirty = true;
let mut results = Vec::new();
for index in 0..3 {
let event = EventBuilder::new(
Kind::from(MESSAGE_EVENT_KIND as u16),
format!("failed publish {index}"),
)
.sign_with_keys(&device)
.expect("event");
let event_id = event.id.to_string();
core.pending_relay_publishes.insert(
event_id.clone(),
PendingRelayPublish {
owner_pubkey_hex: owner.public_key().to_hex(),
event_id: event_id.clone(),
label: "test".to_string(),
event_json: serde_json::to_string(&event).expect("event json"),
inner_event_id: None,
target_owner_pubkey_hex: None,
target_device_id: None,
message_id: None,
chat_id: None,
created_at_secs: event.created_at.as_secs(),
attempt_count: 0,
last_error: None,
},
);
results.push(RelayPublishDrainResult {
event_id,
message_id: None,
chat_id: None,
success: false,
relay_urls: Vec::new(),
detail: "publish failed".to_string(),
});
}
let rev_before = core.state.rev;
core.handle_relay_publish_drain_finished(7, results);
assert_eq!(
core.relay_transport_runtime.retry_backoff_attempt, 1,
"one failed drain should schedule one transport retry, not one per event"
);
assert!(
core.protocol_subscription_runtime.liveness_due_at.is_some(),
"failed drain should schedule a retry wakeup"
);
assert!(!core.relay_transport_runtime.publish_drain_in_flight);
assert_eq!(
core.state.rev,
rev_before + 1,
"drain results should rebuild and emit one full state"
);
assert!(core
.pending_relay_publishes
.values()
.all(|pending| pending.attempt_count == 1));
}
#[test]
fn app_keys_publish_uses_durable_pending_publish_queue() {
let owner = Keys::generate();
let device = Keys::generate();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = logged_in_test_core_at_data_dir(
&owner,
&device,
temp_dir.path().to_string_lossy().to_string(),
);
core.upsert_local_app_key_device(owner.public_key(), device.public_key());
let previous_created_at = core
.app_keys
.get_mut(&owner.public_key().to_hex())
.expect("local AppKeys")
.created_at_secs;
let linked_device = Keys::generate();
core.upsert_local_app_key_device(owner.public_key(), linked_device.public_key());
let app_keys_created_at = core
.app_keys
.get(&owner.public_key().to_hex())
.expect("updated AppKeys")
.created_at_secs;
assert!(
app_keys_created_at > previous_created_at,
"AppKeys updates must advance replaceable-event timestamps even inside one Unix second"
);
core.publish_local_app_keys();
let pending = core
.pending_relay_publishes
.values()
.find(|pending| pending.label == "app-keys")
.expect("AppKeys should be tracked as a durable publish");
let event: Event = serde_json::from_str(&pending.event_json).expect("stored event json");
assert!(is_app_keys_event(&event));
assert_eq!(event.pubkey, owner.public_key());
assert_eq!(event.created_at.as_secs(), app_keys_created_at);
let device_tags = event
.tags
.iter()
.filter_map(|tag| {
let values = tag.clone().to_vec();
(values.first().map(|value| value.as_str()) == Some("device"))
.then(|| values.get(1).cloned())
.flatten()
})
.collect::<Vec<_>>();
assert!(device_tags.contains(&device.public_key().to_hex()));
assert!(device_tags.contains(&linked_device.public_key().to_hex()));
assert!(
pending
.last_error
.as_deref()
.unwrap_or_default()
.contains("skipped=no_servers"),
"empty-relay publish should remain queued with a retryable error"
);
}
#[test]
fn app_keys_publish_prunes_superseded_pending_publish() {
let owner = Keys::generate();
let device = Keys::generate();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = logged_in_test_core_at_data_dir(
&owner,
&device,
temp_dir.path().to_string_lossy().to_string(),
);
core.upsert_local_app_key_device(owner.public_key(), device.public_key());
core.publish_local_app_keys();
let first_pending = core
.pending_relay_publishes
.values()
.find(|pending| pending.label == "app-keys")
.cloned()
.expect("first AppKeys publish should be queued");
let first_event: Event =
serde_json::from_str(&first_pending.event_json).expect("first AppKeys event");
let linked_device = Keys::generate();
core.upsert_local_app_key_device(owner.public_key(), linked_device.public_key());
core.publish_local_app_keys();
let app_key_publishes = core
.pending_relay_publishes
.values()
.filter(|pending| pending.label == "app-keys")
.collect::<Vec<_>>();
assert_eq!(
app_key_publishes.len(),
1,
"stale replaceable AppKeys publishes should not block relay drain"
);
assert!(
!core
.pending_relay_publishes
.contains_key(&first_pending.event_id),
"older AppKeys event should be removed from the durable queue"
);
let current_event: Event =
serde_json::from_str(&app_key_publishes[0].event_json).expect("current AppKeys event");
assert_eq!(current_event.pubkey, owner.public_key());
assert!(current_event.created_at.as_secs() > first_event.created_at.as_secs());
}
#[test]
fn upserting_existing_local_app_key_device_is_protocol_noop() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("appkeys-existing-device-noop", &owner, &device);
let owner_hex = owner.public_key().to_hex();
core.upsert_local_app_key_device(owner.public_key(), device.public_key());
let initial = core
.app_keys
.get(&owner_hex)
.expect("local AppKeys")
.clone();
core.upsert_local_app_key_device(owner.public_key(), device.public_key());
assert_eq!(
core.app_keys.get(&owner_hex),
Some(&initial),
"restoring the same owner/device must not create a newer local AppKeys protocol state"
);
}
#[test]
fn relay_duplicate_or_newer_replaceable_rejection_is_terminal_success() {
assert!(relay_publish_failure_is_terminal_success(
"duplicate: already have this event"
));
assert!(relay_publish_failure_is_terminal_success(
"replaced: have newer event"
));
assert!(!relay_publish_failure_is_terminal_success(
"rate-limited: slow down"
));
assert!(!relay_publish_failure_is_terminal_success(
"blocked: event rejected"
));
}
#[test]
fn network_status_includes_configured_relay_connection_status() {
let owner = Keys::generate();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = AppCore::new(
flume::unbounded().0,
flume::unbounded().0,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.preferences.nostr_relay_urls = vec!["wss://relay.invalid".to_string()];
core.start_primary_session(owner.clone(), owner, false, false)
.expect("primary session");
let status = core.state.network_status.as_ref().expect("network status");
assert_eq!(status.relay_urls, vec!["wss://relay.invalid".to_string()]);
assert_eq!(status.relay_connections.len(), 1);
assert_eq!(status.relay_connections[0].url, "wss://relay.invalid");
assert!(
["connecting", "offline"].contains(&status.relay_connections[0].status.as_str()),
"unexpected relay status: {}",
status.relay_connections[0].status
);
assert_eq!(status.connected_relay_count, 0);
assert!(status.all_relays_offline_since_secs.is_some());
}
#[test]
fn relay_status_events_match_normalized_relay_urls() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("relay-status-normalized", &owner, &device);
core.preferences.nostr_relay_urls = vec!["wss://relay.example".to_string()];
core.handle_relay_status_changed("wss://relay.example/".to_string(), RelayStatus::Connected);
assert!(core.debug_log.iter().any(|entry| {
entry.category == "relay.status" && entry.detail.starts_with("url=wss://relay.example ")
}));
let relay_status_log_count = core
.debug_log
.iter()
.filter(|entry| entry.category == "relay.status")
.count();
core.handle_relay_status_changed("wss://relay.example".to_string(), RelayStatus::Connected);
core.handle_relay_status_changed_for_generation(
"wss://relay.example".to_string(),
RelayStatus::Disconnected,
core.relay_status_watch_generation.wrapping_add(1),
);
assert_eq!(
core.debug_log
.iter()
.filter(|entry| entry.category == "relay.status")
.count(),
relay_status_log_count
);
assert_eq!(core.relay_connected_count, 1);
}
#[test]
fn relay_status_bucket_only_changes_do_not_emit_state() {
let owner = Keys::generate();
let device = Keys::generate();
let (update_tx, update_rx) = flume::unbounded();
let temp_dir = tempfile::TempDir::new().expect("temp dir");
let mut core = AppCore::new(
update_tx,
flume::unbounded().0,
temp_dir.path().to_string_lossy().to_string(),
Arc::new(RwLock::new(AppState::empty())),
);
core.preferences.nostr_relay_urls = vec!["wss://relay.example".to_string()];
let device_id = device.public_key().to_hex();
let invite =
Invite::create_new(device.public_key(), Some(device_id), None).expect("local invite");
core.logged_in = Some(LoggedInState {
owner_pubkey: owner.public_key(),
owner_keys: Some(owner),
device_keys: device.clone(),
client: Client::new(device),
relay_urls: Vec::new(),
local_invite: invite,
authorization_state: LocalAuthorizationState::Authorized,
});
while update_rx.try_recv().is_ok() {}
core.handle_relay_status_changed("wss://relay.example".to_string(), RelayStatus::Initialized);
assert!(
matches!(update_rx.try_recv(), Ok(AppUpdate::FullState(_))),
"offline -> connecting should still emit once"
);
while update_rx.try_recv().is_ok() {}
let rev = core.state.rev;
let relay_status_log_count = core
.debug_log
.iter()
.filter(|entry| entry.category == "relay.status")
.count();
core.handle_relay_status_changed("wss://relay.example".to_string(), RelayStatus::Pending);
core.handle_relay_status_changed("wss://relay.example".to_string(), RelayStatus::Connecting);
assert_eq!(
core.state.rev, rev,
"internal connecting-state churn must not push a new full state"
);
assert!(
update_rx.try_recv().is_err(),
"no FullState should be emitted for connecting -> connecting transitions"
);
assert_eq!(
core.debug_log
.iter()
.filter(|entry| entry.category == "relay.status")
.count(),
relay_status_log_count,
"suppressed transitions should not churn the visible debug summary"
);
}
#[test]
fn distinct_protocol_publishes_for_same_target_are_kept() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let mut core = logged_in_test_core("distinct-protocol-publishes", &owner, &device);
let chat_id = peer.public_key().to_hex();
let message_id = "duplicate-publish-message".to_string();
let target_device_id = "target-device".to_string();
core.push_outgoing_message_with_id(
message_id.clone(),
&chat_id,
"keep both".to_string(),
1_777_159_500,
None,
DeliveryState::Pending,
);
let first = EventBuilder::new(Kind::from(MESSAGE_EVENT_KIND as u16), "first")
.sign_with_keys(&device)
.expect("first event");
let first_event_id = first.id.to_string();
let second = EventBuilder::new(Kind::from(MESSAGE_EVENT_KIND as u16), "second")
.sign_with_keys(&device)
.expect("second event");
let second_event_id = second.id.to_string();
assert!(core.publish_runtime_event_with_metadata(
first,
APPCORE_PROTOCOL_LABEL,
Some((message_id.clone(), chat_id.clone())),
Some("inner-message".to_string()),
Some(chat_id.clone()),
Some(target_device_id.clone()),
));
assert!(core.publish_runtime_event_with_metadata(
second,
APPCORE_PROTOCOL_LABEL,
Some((message_id, chat_id.clone())),
Some("inner-message".to_string()),
Some(chat_id),
Some(target_device_id),
));
assert!(core.pending_relay_publishes.contains_key(&first_event_id));
assert!(core.pending_relay_publishes.contains_key(&second_event_id));
assert_eq!(core.pending_relay_publishes.len(), 2);
}
#[test]
fn protocol_subscription_reconcile_defers_while_in_flight() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-reconcile-single-flight", &owner, &device);
core.protocol_subscription_runtime.desired_plan = Some(protocol_plan_for_test(
vec![device.public_key()],
Vec::new(),
));
core.protocol_subscription_runtime.refresh_in_flight = true;
core.reconcile_protocol_subscriptions("test", true);
assert!(core.protocol_subscription_runtime.refresh_in_flight);
assert!(core.protocol_subscription_runtime.refresh_dirty);
assert!(core.protocol_subscription_runtime.force_reconnect_dirty);
assert_eq!(core.protocol_subscription_runtime.reconcile_token, 0);
}
#[test]
fn protocol_subscription_desired_plan_is_not_applied_before_reconcile_success() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-desired-not-applied", &owner, &device);
let relay_urls = relay_urls_from_strings(&["wss://relay.invalid".to_string()]);
core.preferences.nostr_relay_urls = vec!["wss://relay.invalid".to_string()];
core.logged_in.as_mut().expect("logged in").relay_urls = relay_urls;
core.request_protocol_subscription_refresh_forced();
assert!(
core.protocol_subscription_runtime.desired_plan.is_some(),
"refresh computes the desired plan synchronously"
);
assert_eq!(
core.protocol_subscription_runtime.applied_plan, None,
"desired plan must not be reported as applied before relay apply succeeds"
);
}
#[test]
fn failed_protocol_subscription_apply_clears_inflight_and_keeps_dirty() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-apply-failure", &owner, &device);
let plan = protocol_plan_for_test(vec![device.public_key()], Vec::new());
core.protocol_subscription_runtime.desired_plan = Some(plan.clone());
core.protocol_subscription_runtime.applying_plan = Some(plan.clone());
core.protocol_subscription_runtime.refresh_in_flight = true;
core.protocol_subscription_runtime.reconcile_token = 3;
core.handle_protocol_subscription_reconcile_completed(
core.protocol_reconnect_token,
3,
"test_failure".to_string(),
Some(plan),
false,
Some("injected failure".to_string()),
vec![("wss://relay.example".to_string(), RelayStatus::Connected)],
1,
1,
1,
);
assert!(!core.protocol_subscription_runtime.refresh_in_flight);
assert!(core.protocol_subscription_runtime.applying_plan.is_none());
assert!(core.protocol_subscription_runtime.refresh_dirty);
assert!(core.protocol_subscription_runtime.applied_plan.is_none());
assert!(core.protocol_subscription_runtime.liveness_due_at.is_some());
}
#[test]
fn successful_protocol_subscription_apply_sets_applied_plan() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-apply-success", &owner, &device);
let plan = protocol_plan_for_test(vec![device.public_key()], Vec::new());
core.protocol_subscription_runtime.desired_plan = Some(plan.clone());
core.protocol_subscription_runtime.applying_plan = Some(plan.clone());
core.protocol_subscription_runtime.refresh_in_flight = true;
core.protocol_subscription_runtime.reconcile_token = 5;
core.handle_protocol_subscription_reconcile_completed(
core.protocol_reconnect_token,
5,
"test_success".to_string(),
Some(plan.clone()),
true,
None,
vec![("wss://relay.example".to_string(), RelayStatus::Connected)],
1,
1,
1,
);
assert!(!core.protocol_subscription_runtime.refresh_in_flight);
assert_eq!(core.protocol_subscription_runtime.applied_plan, Some(plan));
assert!(core.protocol_subscription_runtime.applying_plan.is_none());
}
#[test]
fn stale_protocol_subscription_reconcile_completion_is_ignored() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-reconcile-stale", &owner, &device);
core.protocol_reconnect_token = 5;
core.protocol_subscription_runtime.reconcile_token = 7;
core.protocol_subscription_runtime.refresh_in_flight = true;
core.handle_protocol_subscription_reconcile_completed(
5,
6,
"stale".to_string(),
None,
false,
None,
vec![("wss://relay.example".to_string(), RelayStatus::Connected)],
0,
1,
0,
);
assert!(core.protocol_subscription_runtime.refresh_in_flight);
assert_eq!(core.relay_connected_count, 0);
}
#[test]
fn liveness_token_does_not_stale_subscription_completion() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("subscription-liveness-token", &owner, &device);
let plan = protocol_plan_for_test(vec![device.public_key()], Vec::new());
core.protocol_subscription_runtime.desired_plan = Some(plan.clone());
core.protocol_subscription_runtime.applying_plan = Some(plan.clone());
core.protocol_subscription_runtime.refresh_in_flight = true;
core.protocol_subscription_runtime.reconcile_token = 9;
let apply_generation = core.protocol_reconnect_token;
core.schedule_protocol_subscription_liveness_check(Duration::from_secs(30));
assert_ne!(core.protocol_reconnect_token, apply_generation);
core.handle_protocol_subscription_reconcile_completed(
apply_generation,
9,
"test_success_after_liveness_schedule".to_string(),
Some(plan.clone()),
true,
None,
vec![("wss://relay.example".to_string(), RelayStatus::Connected)],
1,
1,
1,
);
assert!(!core.protocol_subscription_runtime.refresh_in_flight);
assert_eq!(core.protocol_subscription_runtime.applied_plan, Some(plan));
}
#[test]
fn debug_snapshot_write_is_coalesced_while_in_flight() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("debug-snapshot-coalesce", &owner, &device);
core.debug_snapshot_write_inflight = true;
core.persist_debug_snapshot_best_effort();
assert!(core.debug_snapshot_write_inflight);
assert!(core.debug_snapshot_write_dirty);
}
#[test]
fn stale_debug_snapshot_write_completion_is_ignored() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("debug-snapshot-stale", &owner, &device);
core.debug_snapshot_write_generation = 10;
core.debug_snapshot_write_inflight = true;
core.debug_snapshot_write_dirty = true;
core.handle_debug_snapshot_write_finished(9);
assert!(core.debug_snapshot_write_inflight);
assert!(core.debug_snapshot_write_dirty);
}
#[test]
fn liveness_retries_protocol_backfill_for_tracked_peer_missing_appkeys_when_connected() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let relay = crate::local_relay::TestRelay::start();
let mut core = logged_in_test_core("tracked-peer-liveness-backfill", &owner, &device);
let relay_urls = relay_urls_from_strings(&[relay.url().to_string()]);
core.preferences.nostr_relay_urls = vec![relay.url().to_string()];
{
let logged_in = core.logged_in.as_mut().expect("logged in");
logged_in.relay_urls = relay_urls.clone();
let client = logged_in.client.clone();
let connected = core.runtime.block_on(async {
ensure_session_relays_configured(&client, &relay_urls).await;
connect_client_with_timeout(&client, Duration::from_secs(2)).await;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let connected = client
.relays()
.await
.values()
.filter(|relay| relay.status() == RelayStatus::Connected)
.count();
if connected > 0 || tokio::time::Instant::now() >= deadline {
break connected;
}
sleep(Duration::from_millis(50)).await;
}
});
assert!(connected > 0, "test relay must be connected");
}
core.active_chat_id = Some(peer.public_key().to_hex());
core.request_protocol_subscription_refresh_forced();
assert!(
core.protocol_subscription_runtime.desired_plan.is_some(),
"tracked peer setup should create runtime protocol subscriptions"
);
core.debug_log.clear();
let token = core.protocol_reconnect_token;
core.handle_protocol_subscription_liveness_check(token);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.catch_up.fetch"),
"connected-relay liveness must still backfill tracked peers with missing AppKeys"
);
}
#[test]
fn protocol_liveness_scheduling_keeps_earliest_reconnect_deadline() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("protocol-liveness-earliest-deadline", &owner, &device);
let relay_urls = relay_urls_from_strings(&["wss://relay.invalid".to_string()]);
core.preferences.nostr_relay_urls = vec!["wss://relay.invalid".to_string()];
core.logged_in.as_mut().expect("logged in").relay_urls = relay_urls;
core.request_protocol_subscription_refresh_forced();
assert!(
core.protocol_subscription_runtime.desired_plan.is_some(),
"logged-in session should derive protocol subscriptions"
);
core.protocol_subscription_runtime.liveness_due_at = None;
core.schedule_protocol_subscription_liveness_check(Duration::from_secs(30));
let first_token = core.protocol_reconnect_token;
let first_due = core
.protocol_subscription_runtime
.liveness_due_at
.expect("initial liveness should be scheduled");
core.schedule_protocol_subscription_liveness_check(Duration::from_secs(30));
assert_eq!(
core.protocol_reconnect_token, first_token,
"a later/equal liveness request must not cancel the pending reconnect"
);
assert_eq!(
core.protocol_subscription_runtime.liveness_due_at,
Some(first_due)
);
core.schedule_protocol_subscription_liveness_check(Duration::from_secs(2));
let fast_token = core.protocol_reconnect_token;
let fast_due = core
.protocol_subscription_runtime
.liveness_due_at
.expect("fast reconnect should be scheduled");
assert!(
fast_token > first_token,
"an earlier liveness request should replace the previous deadline"
);
assert!(
fast_due < first_due,
"fast reconnect should move the liveness deadline earlier"
);
core.schedule_protocol_subscription_liveness_check(Duration::from_secs(30));
assert_eq!(
core.protocol_reconnect_token, fast_token,
"a later liveness request must not starve the fast reconnect"
);
assert_eq!(
core.protocol_subscription_runtime.liveness_due_at,
Some(fast_due)
);
}
#[test]
fn tracked_peer_catch_up_scheduling_coalesces_to_earliest_deadline() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("tracked-peer-catch-up-coalesce", &owner, &device);
core.schedule_tracked_peer_catch_up(Duration::from_secs(30));
let first_token = core
.protocol_subscription_runtime
.tracked_peer_catch_up_token;
let first_due = core
.protocol_subscription_runtime
.tracked_peer_catch_up_due_at
.expect("initial catch-up should be scheduled");
core.schedule_tracked_peer_catch_up(Duration::from_secs(30));
assert_eq!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_token,
first_token,
"a later/equal tracked-peer catch-up must not spawn another timer"
);
assert_eq!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_due_at,
Some(first_due)
);
core.schedule_tracked_peer_catch_up(Duration::from_secs(2));
assert!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_token
> first_token,
"an earlier tracked-peer catch-up should replace the old timer"
);
assert!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_due_at
.expect("fast catch-up should stay scheduled")
< first_due
);
}
#[test]
fn protocol_state_fetch_is_single_flight() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("protocol-fetch-single-flight", &owner, &device);
core.protocol_subscription_runtime.protocol_fetch_in_flight = true;
core.debug_log.clear();
assert!(
!core.fetch_recent_protocol_state(),
"existing protocol fetch should block duplicate catch-up fetch"
);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.catch_up.skip"),
"skipped duplicate fetch should be visible in debug output"
);
}
#[test]
fn targeted_protocol_fetch_is_single_flight() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let mut core = logged_in_test_core("targeted-protocol-fetch-single-flight", &owner, &device);
core.protocol_subscription_runtime.protocol_fetch_in_flight = true;
core.debug_log.clear();
let filters = vec![Filter::new()
.author(peer.public_key())
.kind(Kind::Custom(APP_KEYS_EVENT_KIND as u16))];
assert!(
!core.fetch_protocol_state_for_filters(filters, "test"),
"existing protocol fetch should block duplicate targeted engine fetch"
);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.engine_fetch.skip"),
"skipped targeted fetch should be visible in debug output"
);
assert!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_due_at
.is_some(),
"skipped targeted fetch should schedule a coalesced retry"
);
}
#[test]
fn protocol_fetch_start_is_rate_limited() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("protocol-fetch-rate-limit", &owner, &device);
core.protocol_subscription_runtime
.protocol_fetch_last_started_at = Some(Instant::now());
core.debug_log.clear();
assert!(
!core.fetch_recent_protocol_state(),
"recent protocol fetch should rate-limit broad catch-up fetches"
);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.catch_up.skip"
&& entry.detail.contains("rate limited")),
"rate-limited fetch should be visible in debug output"
);
assert!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_due_at
.is_some(),
"rate-limited fetch should schedule one coalesced retry"
);
}
#[test]
fn protocol_fetch_rate_limit_tolerates_future_start_time() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("protocol-fetch-future-rate-limit", &owner, &device);
core.protocol_subscription_runtime
.protocol_fetch_last_started_at = Some(Instant::now() + Duration::from_secs(30));
core.debug_log.clear();
assert!(
!core.fetch_recent_protocol_state(),
"future protocol fetch timestamp should rate-limit instead of panicking"
);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.catch_up.skip"
&& entry.detail.contains("rate limited")),
"future timestamp should be reported as a rate-limit skip"
);
assert!(
core.protocol_subscription_runtime
.tracked_peer_catch_up_due_at
.is_some(),
"future timestamp should schedule one coalesced retry"
);
}
#[test]
fn protocol_fetch_rate_limit_tolerates_stale_start_time() {
let owner = Keys::generate();
let device = Keys::generate();
let mut core = logged_in_test_core("protocol-fetch-stale-rate-limit", &owner, &device);
core.protocol_subscription_runtime
.protocol_fetch_last_started_at = Some(Instant::now() - Duration::from_secs(60));
core.debug_log.clear();
core.fetch_recent_protocol_state();
assert!(
core.debug_log
.iter()
.all(|entry| entry.category != "protocol.catch_up.skip"
|| !entry.detail.contains("rate limited")),
"stale protocol fetch timestamp should not trigger rate-limit subtraction"
);
}
#[test]
fn liveness_retries_protocol_backfill_for_tracked_peer_with_roster_but_no_session() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let relay = crate::local_relay::TestRelay::start();
let mut core = logged_in_test_core("tracked-peer-roster-no-session-backfill", &owner, &device);
let relay_urls = relay_urls_from_strings(&[relay.url().to_string()]);
core.preferences.nostr_relay_urls = vec![relay.url().to_string()];
{
let logged_in = core.logged_in.as_mut().expect("logged in");
logged_in.relay_urls = relay_urls.clone();
let client = logged_in.client.clone();
let connected = core.runtime.block_on(async {
ensure_session_relays_configured(&client, &relay_urls).await;
connect_client_with_timeout(&client, Duration::from_secs(2)).await;
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let connected = client
.relays()
.await
.values()
.filter(|relay| relay.status() == RelayStatus::Connected)
.count();
if connected > 0 || tokio::time::Instant::now() >= deadline {
break connected;
}
sleep(Duration::from_millis(50)).await;
}
});
assert!(connected > 0, "test relay must be connected");
}
let peer_app_keys = AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 1)]);
{
let batch = core
.protocol_engine
.as_mut()
.expect("protocol engine")
.ingest_app_keys_snapshot(peer_owner.public_key(), peer_app_keys.clone(), 1)
.expect("ingest peer appkeys");
core.process_protocol_engine_retry_batch("test_app_keys", batch);
}
core.app_keys.insert(
peer_owner.public_key().to_hex(),
known_app_keys_from_ndr(peer_owner.public_key(), &peer_app_keys, 1),
);
core.active_chat_id = Some(peer_owner.public_key().to_hex());
assert!(
core.protocol_engine
.as_ref()
.expect("protocol engine")
.message_author_pubkeys_for_owner(peer_owner.public_key())
.is_empty(),
"peer roster without invite response must not have message authors yet"
);
core.request_protocol_subscription_refresh_forced();
assert!(
core.protocol_subscription_runtime.desired_plan.is_some(),
"tracked peer roster should derive protocol-state subscriptions"
);
core.debug_log.clear();
let token = core.protocol_reconnect_token;
core.handle_protocol_subscription_liveness_check(token);
assert!(
core.debug_log
.iter()
.any(|entry| entry.category == "protocol.catch_up.fetch"),
"connected-relay liveness must backfill tracked peers that have AppKeys but no session authors"
);
}
#[test]
fn protocol_backfill_fetches_configure_relays_before_network_fetch() {
let protocol_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/protocol.rs"),
)
.expect("read protocol source");
for fn_name in [
"fetch_recent_messages_for_author",
"fetch_recent_group_sender_key_messages_for_author",
"fetch_recent_protocol_state",
] {
let start = protocol_source
.find(&format!("pub(super) fn {fn_name}"))
.unwrap_or_else(|| panic!("missing {fn_name}"));
let body = &protocol_source[start..];
let end = body
.find("\n pub(super) fn ")
.or_else(|| body.find("\n fn "))
.unwrap_or(body.len());
let body = &body[..end];
assert!(
body.contains("ensure_session_relays_configured(&client, &relay_urls).await;"),
"{fn_name} must configure relays before fetching public-relay backfill"
);
}
}
#[test]
fn relay_connect_helper_does_not_disconnect_when_all_relays_stay_offline() {
let core_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core.rs"),
)
.expect("read core source");
let start = core_source
.find("async fn connect_client_with_timeout")
.expect("connect helper");
let body = &core_source[start
..core_source[start..]
.find("\nasync fn connected_relay_count_for_client")
.map(|offset| start + offset)
.unwrap_or(core_source.len())];
assert!(
body.contains("connected_relay_count_for_client(client).await > 0"),
"connect helper must detect all-offline relay clients"
);
assert!(
!body.contains("client.disconnect().await"),
"normal relay connect helper must not tear down the shared client when every relay remains offline"
);
assert!(
body.contains("client.connect().await"),
"connect helper must start the shared relay client without owning disconnect lifecycle"
);
}
#[test]
fn runtime_publish_uses_single_flight_transport_drain() {
let publish_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/publishing.rs"),
)
.expect("read publishing source");
let start = publish_source
.find("pub(super) fn retry_pending_relay_publishes")
.expect("pending relay publish retry");
let body = &publish_source[start
..publish_source[start..]
.find("\n pub(super) fn handle_relay_publish_drain_finished")
.map(|offset| start + offset)
.unwrap_or(publish_source.len())];
assert!(
body.contains("publish_drain_in_flight") && body.contains("publish_drain_dirty"),
"pending relay publishes must coalesce through one drain worker"
);
assert!(
body.contains("request_relay_connection"),
"offline pending publish retry must request the shared relay transport connection"
);
assert!(
body.contains("publish_event_to_any_relay")
&& body.contains("PENDING_RELAY_DRAIN_CONCURRENCY"),
"drain worker must publish with bounded no-connect attempts"
);
assert!(
!body.contains("connect_client_with_timeout") && !body.contains("client.disconnect"),
"drain worker must not connect or disconnect the shared relay client per pending event"
);
assert!(
!publish_source.contains("spawn_relay_publish_attempt"),
"pending relay publish retry must not spawn one connection-owning task per event"
);
}
#[test]
fn direct_send_hot_path_does_not_force_global_catch_up_for_established_messages() {
let chats_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/chats.rs"),
)
.expect("read chats source");
let start = chats_source
.find("pub(super) fn send_direct_message")
.expect("send_direct_message");
let body = &chats_source[start
..chats_source[start..]
.find("\n pub(super) fn send_group_message")
.map(|offset| start + offset)
.unwrap_or(chats_source.len())];
assert!(
!body.contains("request_protocol_subscription_refresh_forced_reconnect_if_offline"),
"normal direct sends must not force relay/subscription reconnect on every message"
);
assert!(
!body.contains("fetch_recent_messages_for_tracked_peers"),
"normal direct sends must not launch global tracked-peer catch-up on every message"
);
assert!(
!body.contains("schedule_tracked_peer_catch_up"),
"normal direct sends must not schedule delayed global catch-up on every message"
);
}
#[test]
fn zero_connected_transport_connect_schedules_backoff_retry() {
let protocol_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/protocol.rs"),
)
.expect("read protocol source");
let start = protocol_source
.find("pub(super) fn handle_relay_transport_connection_finished")
.expect("transport connection handler");
let body = &protocol_source[start
..protocol_source[start..]
.find("\n pub(super) fn schedule_relay_transport_retry")
.map(|offset| start + offset)
.unwrap_or(protocol_source.len())];
assert!(
body.contains("schedule_relay_transport_retry(\"connect_failed\")")
&& body.contains("retry_pending_relay_publishes(\"relay_transport_connected\")"),
"zero-connected transport checks must back off, while successful connects drain pending publishes"
);
}
#[test]
fn pending_inbound_direct_message_schedules_fast_liveness_retry() {
let relay_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/relay.rs"),
)
.expect("read relay source");
let start = relay_source
.find("\"appcore.protocol.message.pending\"")
.expect("pending direct message branch");
let body = &relay_source[start
..relay_source[start..]
.find("Err(error)")
.map(|offset| start + offset)
.unwrap_or(relay_source.len())];
assert!(
body.contains("schedule_protocol_subscription_liveness_check")
&& body.contains("PROTOCOL_RECONNECT_CHECK_SECS"),
"pending inbound direct events must schedule a fast protocol retry instead of waiting for restart/foreground"
);
}
#[test]
fn liveness_retries_pending_inbound_direct_events() {
let protocol_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/protocol.rs"),
)
.expect("read protocol source");
let start = protocol_source
.find("pub(super) fn handle_protocol_subscription_liveness_check")
.expect("liveness handler");
let body = &protocol_source[start
..protocol_source[start..]
.find("\n pub(super) fn reconcile_protocol_subscriptions")
.map(|offset| start + offset)
.unwrap_or(protocol_source.len())];
assert!(
body.contains("has_pending_inbound_direct_events")
&& body.contains("should_retry_backfill"),
"protocol liveness must retry durable pending inbound direct events even when tracked-peer backfill appears complete"
);
}
fn read_protocol_engine_source() -> String {
let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
[
"src/core/protocol_engine.rs",
"src/core/protocol_engine/types.rs",
"src/core/protocol_engine/engine_core.rs",
"src/core/protocol_engine/engine_sends.rs",
"src/core/protocol_engine/engine_incoming_retry.rs",
"src/core/protocol_engine/engine_resolution.rs",
"src/core/protocol_engine/engine_queue_filters.rs",
"src/core/protocol_engine/free_functions.rs",
]
.into_iter()
.map(|path| {
std::fs::read_to_string(manifest.join(path))
.unwrap_or_else(|error| panic!("read {path}: {error}"))
})
.collect::<Vec<_>>()
.join("\n")
}
#[test]
fn appcore_sender_owner_resolution_keeps_claimed_device_pending_until_owner_verified() {
let protocol_source = read_protocol_engine_source();
let start = protocol_source
.find("fn resolve_message_sender_owner")
.expect("sender resolver");
let body = &protocol_source[start
..protocol_source[start..]
.find("\n fn ensure_local_roster")
.map(|offset| start + offset)
.unwrap_or(protocol_source.len())];
assert!(
body.contains("PendingOwnerClaim"),
"claimed owners must be represented as pending, not collapsed into a device owner"
);
assert!(
!body.contains("NdrOwnerPubkey::from_bytes(envelope.sender.to_bytes())"),
"message envelope sender is a ratchet sender key and must not become the canonical owner"
);
}
#[test]
fn pending_inbound_owner_targets_use_cached_metadata_not_event_reparse() {
let protocol_source = read_protocol_engine_source();
let start = protocol_source
.find("fn pending_inbound_owner_claim_targets")
.expect("pending inbound target collector");
let body = &protocol_source[start
..protocol_source[start..]
.find("\n fn pending_group_pairwise_owner_claim_targets")
.map(|offset| start + offset)
.unwrap_or(protocol_source.len())];
assert!(
body.contains("claimed_owner_pubkey_hex"),
"pending inbound owner target collection must use cached owner metadata"
);
assert!(
!body.contains("parse_message_event"),
"pending inbound owner target collection runs on the relay hot path and must not verify every pending event"
);
}
#[test]
fn relay_pending_inbound_replays_are_short_circuited() {
let relay_source = std::fs::read_to_string(
std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("src/core/relay.rs"),
)
.expect("read relay source");
let start = relay_source
.find("if kind == MESSAGE_EVENT_KIND")
.expect("message event branch");
let body = &relay_source[start
..relay_source[start..]
.find("\n self.remember_event(event_id);")
.map(|offset| start + offset)
.unwrap_or(relay_source.len())];
assert!(
body.contains("has_pending_inbound_direct_event_id")
&& body.contains("appcore.protocol.message.pending_replay"),
"relay replays of already-durable pending inbound events must avoid reparsing and refetching immediately"
);
}
#[test]
fn group_sender_key_ignored_results_are_consumed_without_retry_queue() {
let protocol_source = read_protocol_engine_source();
let process_start = protocol_source
.find("pub(super) fn process_group_outer_event")
.expect("process group outer function");
let process_body = &protocol_source[process_start
..protocol_source[process_start..]
.find("pub(super) fn process_group_pairwise_payload")
.map(|offset| process_start + offset)
.unwrap_or(protocol_source.len())];
assert!(
process_body.contains("if result.pending"),
"group outer handling must queue sender-key messages only for explicit pending results"
);
assert!(
!process_body.contains("if result.events.is_empty()"),
"ignored sender-key results have no events but must not be queued for retry"
);
let handle_start = protocol_source
.find("fn handle_group_sender_key_message")
.expect("handle sender key function");
let handle_body = &protocol_source[handle_start
..protocol_source[handle_start..]
.find("fn upsert_pending_outbound")
.map(|offset| handle_start + offset)
.unwrap_or(protocol_source.len())];
assert!(
handle_body.contains("GroupSenderKeyHandleResult::Ignored")
&& handle_body.contains("consumed: true"),
"ignored parsed sender-key events should be consumed so public-relay replays do not loop"
);
}
#[test]
fn appcore_protocol_engine_missing_remote_owner_send_keeps_owner_pending() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
observe_current_device_appkeys_for_test(&mut engine, &owner, &device);
let result = engine
.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"first",
None,
UnixSeconds(3),
)
.expect("direct send");
let published_peer_targets = result
.effects
.iter()
.filter_map(|effect| match effect {
ProtocolEffect::PublishSignedForInnerEvent {
target_owner_pubkey_hex,
..
} => target_owner_pubkey_hex.clone(),
_ => None,
})
.collect::<Vec<_>>();
assert!(
!published_peer_targets.contains(&peer_owner.public_key().to_hex()),
"peer owner must not be considered published before peer protocol state exists"
);
let owner_marker = format!("owner:{}", peer_owner.public_key().to_hex());
let local_owner_marker = format!("owner:{}", owner.public_key().to_hex());
assert!(result.queued_targets.contains(&owner_marker));
assert!(result.queued_targets.contains(&local_owner_marker));
let snapshot = engine.debug_snapshot();
assert_eq!(snapshot.pending_outbound_count, 1);
assert!(snapshot.pending_outbound_targets.contains(&owner_marker));
assert!(snapshot
.pending_outbound_targets
.contains(&local_owner_marker));
}
#[test]
fn appcore_protocol_engine_retry_before_peer_discovery_keeps_missing_roster_pending() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
observe_current_device_appkeys_for_test(&mut engine, &owner, &device);
let result = engine
.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"first",
None,
UnixSeconds(3),
)
.expect("direct send");
let owner_marker = format!("owner:{}", peer_owner.public_key().to_hex());
let local_owner_marker = format!("owner:{}", owner.public_key().to_hex());
assert!(result.queued_targets.contains(&owner_marker));
assert!(result.queued_targets.contains(&local_owner_marker));
let retries = engine
.retry_pending_outbound(NdrUnixSeconds(10_000))
.expect("retry pending outbound");
assert_eq!(retries.len(), 1);
assert_eq!(retries[0].message_id, result.message_id);
assert!(retries[0].event_ids.is_empty());
assert!(
retries[0]
.effects
.iter()
.any(|effect| matches!(effect, ProtocolEffect::FetchProtocolState { .. })),
"retrying missing roster work should re-emit protocol backfill from the engine"
);
assert!(retries[0].queued_targets.contains(&owner_marker));
let snapshot = engine.debug_snapshot();
assert_eq!(snapshot.pending_outbound_count, 1);
assert!(snapshot.pending_outbound_targets.contains(&owner_marker));
}
#[test]
fn appcore_direct_send_storage_failure_rolls_back_protocol_state() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let storage = Arc::new(SwitchableFailStorage::new());
let mut engine = test_protocol_engine_with_storage(
&owner,
&device,
storage.clone() as Arc<dyn StorageAdapter>,
);
observe_current_device_appkeys_for_test(&mut engine, &owner, &device);
engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 1)]),
1,
)
.expect("peer appkeys");
let mut rng = OsRng;
let mut ctx = ProtocolContext::new(NdrUnixSeconds(2), &mut rng);
let invite = Invite::create_new_with_context(
&mut ctx,
NdrDevicePubkey::from_bytes(peer_device.public_key().to_bytes()),
Some(NdrOwnerPubkey::from_bytes(
peer_owner.public_key().to_bytes(),
)),
None,
)
.expect("peer invite");
let invite_event = nostr_double_ratchet_nostr::invite_unsigned_event(&invite)
.expect("invite event")
.sign_with_keys(&peer_device)
.expect("signed invite");
engine
.observe_invite_event(&invite_event)
.expect("observe invite");
let before = engine.session_manager_snapshot_for_test();
storage.set_fail_puts(true);
let result = engine.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"rollback",
None,
UnixSeconds(3),
);
assert!(result.is_err());
assert_eq!(
engine.session_manager_snapshot_for_test(),
before,
"failed persistence must roll back in-memory ratchet state"
);
assert_eq!(
engine.debug_snapshot().pending_outbound_count,
0,
"failed persistence must not leave pending outbound state in memory"
);
}
#[test]
fn appcore_group_create_storage_failure_rolls_back_protocol_state() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let storage = Arc::new(SwitchableFailStorage::new());
let mut engine = test_protocol_engine_with_storage(
&owner,
&device,
storage.clone() as Arc<dyn StorageAdapter>,
);
observe_current_device_appkeys_for_test(&mut engine, &owner, &device);
let before_sessions = engine.session_manager_snapshot_for_test();
let before_groups = engine.group_manager_snapshot_for_test();
storage.set_fail_puts(true);
let result = engine.create_group(
"rollback group".to_string(),
vec![peer_owner.public_key()],
UnixSeconds(3),
);
assert!(result.is_err());
assert_eq!(
engine.session_manager_snapshot_for_test(),
before_sessions,
"failed group persistence must roll back session fanout preparation"
);
assert_eq!(
engine.group_manager_snapshot_for_test(),
before_groups,
"failed group persistence must roll back group manager state"
);
assert_eq!(
engine.debug_snapshot().pending_group_fanout_count,
0,
"failed group persistence must not leave pending group fanouts in memory"
);
}
#[test]
fn appcore_invite_event_wakes_device_queued_direct_send_before_retry_delay() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
observe_current_device_appkeys_for_test(&mut engine, &owner, &device);
let send = engine
.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"queued until invite",
None,
UnixSeconds(3),
)
.expect("direct send");
assert!(
send.queued_targets
.contains(&format!("owner:{}", peer_owner.public_key().to_hex())),
"missing peer roster should be queued"
);
assert!(
send.queued_targets
.contains(&format!("owner:{}", owner.public_key().to_hex())),
"local sibling discovery should remain queued until the owner roster is known to have no siblings"
);
let app_keys_batch = engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 4)]),
4,
)
.expect("peer appkeys");
assert_eq!(app_keys_batch.direct_results.len(), 1);
assert!(
app_keys_batch.direct_results[0]
.queued_targets
.contains(&peer_device.public_key().to_hex()),
"after peer roster discovery, the peer device invite should remain missing"
);
assert!(
!app_keys_batch.direct_results[0]
.queued_targets
.contains(&format!("owner:{}", peer_owner.public_key().to_hex())),
"peer owner discovery should be drained after AppKeys arrive"
);
let mut rng = OsRng;
let mut ctx = ProtocolContext::new(NdrUnixSeconds(5), &mut rng);
let invite = Invite::create_new_with_context(
&mut ctx,
NdrDevicePubkey::from_bytes(peer_device.public_key().to_bytes()),
Some(NdrOwnerPubkey::from_bytes(
peer_owner.public_key().to_bytes(),
)),
None,
)
.expect("peer invite");
let invite_event = nostr_double_ratchet_nostr::invite_unsigned_event(&invite)
.expect("invite event")
.sign_with_keys(&peer_device)
.expect("signed invite");
let invite_batch = engine
.observe_invite_event(&invite_event)
.expect("observe invite");
assert_eq!(invite_batch.direct_results.len(), 1);
assert_eq!(invite_batch.direct_results[0].message_id, send.message_id);
assert_eq!(invite_batch.direct_results[0].event_ids.len(), 1);
assert!(
!engine
.debug_snapshot()
.pending_outbound_targets
.contains(&peer_device.public_key().to_hex()),
"remote peer fanout should be fully drained"
);
}
#[test]
fn seen_invite_event_replays_into_protocol_engine_for_queued_send() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let mut core = logged_in_test_core("seen-invite-replay", &owner, &device);
{
let engine = core.protocol_engine.as_mut().expect("protocol engine");
observe_current_device_appkeys_for_test(engine, &owner, &device);
engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 4)]),
4,
)
.expect("peer appkeys");
}
core.send_direct_message(
&peer_owner.public_key().to_hex(),
"queued until seen invite replays",
UnixSeconds(5),
None,
);
assert!(
core.protocol_engine
.as_ref()
.expect("protocol engine")
.debug_snapshot()
.pending_outbound_targets
.contains(&peer_device.public_key().to_hex()),
"send should wait for the peer device invite"
);
let mut rng = OsRng;
let mut ctx = ProtocolContext::new(NdrUnixSeconds(6), &mut rng);
let invite = Invite::create_new_with_context(
&mut ctx,
NdrDevicePubkey::from_bytes(peer_device.public_key().to_bytes()),
Some(NdrOwnerPubkey::from_bytes(
peer_owner.public_key().to_bytes(),
)),
None,
)
.expect("peer invite");
let invite_event = nostr_double_ratchet_nostr::invite_unsigned_event(&invite)
.expect("invite event")
.sign_with_keys(&peer_device)
.expect("signed invite");
core.remember_event(invite_event.id.to_string());
core.handle_relay_event(invite_event);
let debug = core
.protocol_engine
.as_ref()
.expect("protocol engine")
.debug_snapshot();
assert!(
!debug
.pending_outbound_targets
.contains(&peer_device.public_key().to_hex()),
"seen invite events must still rebuild protocol state and drain queued sends"
);
}
#[test]
fn appcore_direct_send_keeps_local_sibling_probe_until_local_appkeys_and_invite_arrive() {
let owner = Keys::generate();
let fresh_device = Keys::generate();
let old_device = Keys::generate();
let peer_owner = Keys::generate();
let mut engine = test_protocol_engine(&owner, &fresh_device);
let send = engine
.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"self sync should not be dropped",
None,
UnixSeconds(3),
)
.expect("direct send");
assert!(
send.queued_targets
.contains(&format!("owner:{}", peer_owner.public_key().to_hex())),
"remote owner discovery should remain queued"
);
assert!(
send.queued_targets
.contains(&format!("owner:{}", owner.public_key().to_hex())),
"local sibling roster discovery must be queued until local AppKeys have been observed"
);
let local_app_keys_created_at = unix_now().get();
let local_app_keys = AppKeys::new(vec![
DeviceEntry::new(old_device.public_key(), 1),
DeviceEntry::new(fresh_device.public_key(), local_app_keys_created_at),
]);
let app_keys_batch = engine
.ingest_app_keys_snapshot(
owner.public_key(),
local_app_keys,
local_app_keys_created_at,
)
.expect("local appkeys");
assert_eq!(app_keys_batch.direct_results.len(), 1);
assert!(
app_keys_batch.direct_results[0]
.queued_targets
.contains(&old_device.public_key().to_hex()),
"local AppKeys should turn the local owner probe into the old device invite target"
);
let mut rng = OsRng;
let mut ctx = ProtocolContext::new(NdrUnixSeconds(5), &mut rng);
let old_invite = Invite::create_new_with_context(
&mut ctx,
NdrDevicePubkey::from_bytes(old_device.public_key().to_bytes()),
Some(NdrOwnerPubkey::from_bytes(owner.public_key().to_bytes())),
None,
)
.expect("old device invite");
let old_invite_event = nostr_double_ratchet_nostr::invite_unsigned_event(&old_invite)
.expect("invite event")
.sign_with_keys(&old_device)
.expect("signed invite");
let invite_batch = engine
.observe_invite_event(&old_invite_event)
.expect("observe old device invite");
assert_eq!(invite_batch.direct_results.len(), 1);
let retry = &invite_batch.direct_results[0];
assert_eq!(retry.message_id, send.message_id);
assert!(
retry.effects.iter().any(|effect| matches!(
effect,
ProtocolEffect::PublishStagedFirstContact { payload, .. }
if payload.iter().any(|publish| publish.target_owner_pubkey_hex.as_deref()
== Some(owner.public_key().to_hex().as_str())
&& publish.target_device_id.as_deref()
== Some(old_device.public_key().to_hex().as_str()))
)) || retry.effects.iter().any(|effect| matches!(
effect,
ProtocolEffect::PublishSignedForInnerEvent {
target_owner_pubkey_hex,
target_device_id,
..
} if target_owner_pubkey_hex.as_deref() == Some(owner.public_key().to_hex().as_str())
&& target_device_id.as_deref() == Some(old_device.public_key().to_hex().as_str())
)),
"old local device should receive a sender-copy publish after its invite arrives"
);
}
#[test]
fn self_direct_send_retries_to_restored_sibling_after_invite_arrives() {
let owner = Keys::generate();
let phone_device = Keys::generate();
let desktop_device = Keys::generate();
let mut engine = test_protocol_engine(&owner, &phone_device);
engine
.ingest_app_keys_snapshot(
owner.public_key(),
AppKeys::new(vec![
DeviceEntry::new(phone_device.public_key(), 1),
DeviceEntry::new(desktop_device.public_key(), 1),
]),
1,
)
.expect("local AppKeys");
let send = engine
.send_direct_text(
owner.public_key(),
&owner.public_key().to_hex(),
"self message should reach the restored sibling",
None,
UnixSeconds(3),
)
.expect("self direct send");
assert!(
send.queued_targets
.contains(&desktop_device.public_key().to_hex()),
"sibling device should stay queued until its invite is observed"
);
let mut rng = OsRng;
let mut ctx = ProtocolContext::new(NdrUnixSeconds(5), &mut rng);
let desktop_invite = Invite::create_new_with_context(
&mut ctx,
NdrDevicePubkey::from_bytes(desktop_device.public_key().to_bytes()),
Some(NdrOwnerPubkey::from_bytes(owner.public_key().to_bytes())),
None,
)
.expect("desktop invite");
let desktop_invite_event = nostr_double_ratchet_nostr::invite_unsigned_event(&desktop_invite)
.expect("invite event")
.sign_with_keys(&desktop_device)
.expect("signed invite");
let invite_batch = engine
.observe_invite_event(&desktop_invite_event)
.expect("observe desktop invite");
assert!(
invite_batch.direct_results.iter().any(|result| {
result.message_id == send.message_id
&& !result.queued_targets.contains(&desktop_device.public_key().to_hex())
&& (result.effects.iter().any(|effect| matches!(
effect,
ProtocolEffect::PublishStagedFirstContact { payload, .. }
if payload.iter().any(|publish| publish.target_owner_pubkey_hex.as_deref()
== Some(owner.public_key().to_hex().as_str())
&& publish.target_device_id.as_deref()
== Some(desktop_device.public_key().to_hex().as_str()))
)) || result.effects.iter().any(|effect| matches!(
effect,
ProtocolEffect::PublishSignedForInnerEvent {
target_owner_pubkey_hex,
target_device_id,
..
} if target_owner_pubkey_hex.as_deref()
== Some(owner.public_key().to_hex().as_str())
&& target_device_id.as_deref()
== Some(desktop_device.public_key().to_hex().as_str())
)))
}),
"observing the sibling invite should retry the queued self-send to that device"
);
}
#[test]
fn appcore_local_appkeys_backfill_replaces_seeded_single_device_roster() {
let owner = Keys::generate();
let fresh_device = Keys::generate();
let old_device = Keys::generate();
let peer_owner = Keys::generate();
let mut engine = test_protocol_engine(&owner, &fresh_device);
let send = engine
.send_direct_text(
peer_owner.public_key(),
&peer_owner.public_key().to_hex(),
"older local appkeys should still discover old sibling",
None,
UnixSeconds(3),
)
.expect("direct send");
assert!(
send.queued_targets
.contains(&format!("owner:{}", owner.public_key().to_hex())),
"freshly seeded local device should start with owner-level sibling discovery"
);
let batch = engine
.ingest_app_keys_snapshot(
owner.public_key(),
AppKeys::new(vec![
DeviceEntry::new(old_device.public_key(), 1),
DeviceEntry::new(fresh_device.public_key(), 1),
]),
1,
)
.expect("stale local appkeys");
assert_eq!(batch.direct_results.len(), 1);
assert!(
batch.direct_results[0]
.queued_targets
.contains(&old_device.public_key().to_hex()),
"AppCore's merged local roster is authoritative even when its event timestamp predates the seeded local invite"
);
let snapshot = engine.debug_snapshot();
let pending = snapshot
.pending_outbound_details
.iter()
.find(|pending| pending.message_id == send.message_id)
.expect("pending send detail");
assert_eq!(
pending.remaining_local_sibling_targets,
vec![old_device.public_key().to_hex()]
);
assert!(
!pending
.queued_targets
.contains(&format!("owner:{}", owner.public_key().to_hex())),
"once the merged local roster is installed, retries should target the concrete old device"
);
}
#[test]
fn invite_response_observation_installs_session_author_state() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 1)]),
1,
)
.expect("peer appkeys");
let invite = engine.local_invite_for_test().expect("local invite");
let (_peer_session, response) = invite
.accept_with_owner(
peer_device.public_key(),
peer_device.secret_key().to_secret_bytes(),
Some(peer_device.public_key().to_hex()),
Some(peer_owner.public_key()),
)
.expect("peer accepts invite");
let response_event = nostr_double_ratchet_nostr::invite_response_event(&response)
.expect("invite response event");
engine
.observe_invite_response_event(&response_event)
.expect("observe invite response");
assert!(
!engine
.message_author_pubkeys_for_owner(peer_owner.public_key())
.is_empty(),
"observing the invite response should install receiver state for the peer"
);
}
#[test]
fn appcore_direct_message_from_unverified_claimed_owner_retries_after_appkeys() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
let invite = engine.local_invite_for_test().expect("local invite");
let (mut peer_session, response) = invite
.accept_with_owner(
peer_device.public_key(),
peer_device.secret_key().to_secret_bytes(),
Some(peer_device.public_key().to_hex()),
Some(peer_owner.public_key()),
)
.expect("peer accepts invite");
let response_event = nostr_double_ratchet_nostr::invite_response_event(&response)
.expect("invite response event");
engine
.observe_invite_response_event(&response_event)
.expect("observe invite response");
let plan = peer_session
.plan_send(b"hello-before-appkeys", NdrUnixSeconds(11))
.expect("peer plans message");
let sent = peer_session.apply_send(plan);
let message_event =
nostr_double_ratchet_nostr::message_event(&sent.envelope).expect("message event");
let decrypted = engine
.process_direct_message_event(&message_event)
.expect("process direct message");
assert!(
decrypted.is_none(),
"claimed-owner messages must wait until the owner claim is verified"
);
assert_eq!(engine.debug_snapshot().pending_inbound_count, 1);
let sender_message_pubkey_hex = sent.envelope.sender.to_hex();
let peer_owner_hex = peer_owner.public_key().to_hex();
let pending_inbound = engine.pending_inbound_for_test();
let pending = pending_inbound.first().expect("pending inbound");
assert_eq!(pending.event_id, message_event.id.to_string());
assert!(
pending.has_envelope,
"pending inbound must store the parsed envelope so retries do not verify the outer event again"
);
assert_eq!(
pending.sender_message_pubkey_hex.as_deref(),
Some(sender_message_pubkey_hex.as_str())
);
assert_eq!(
pending.claimed_owner_pubkey_hex.as_deref(),
Some(peer_owner_hex.as_str())
);
assert!(
pending.metadata_verified,
"queued pending inbound metadata should be produced by the already verified parse"
);
assert_eq!(
engine.queued_owner_claim_targets(),
vec![format!("owner:{}", peer_owner.public_key().to_hex())]
);
let batch = engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 12)]),
12,
)
.expect("peer appkeys");
assert_eq!(batch.direct_messages.len(), 1);
assert_eq!(batch.direct_messages[0].sender, peer_owner.public_key());
assert_eq!(
batch.direct_messages[0].sender_device,
Some(peer_device.public_key())
);
assert_eq!(batch.direct_messages[0].content, "hello-before-appkeys");
assert_eq!(engine.debug_snapshot().pending_inbound_count, 0);
}
#[test]
fn appcore_pending_group_payload_from_claimed_device_uses_owner_after_appkeys() {
let owner = Keys::generate();
let device = Keys::generate();
let peer_owner = Keys::generate();
let peer_device = Keys::generate();
let mut engine = test_protocol_engine(&owner, &device);
let invite = engine.local_invite_for_test().expect("local invite");
let (_peer_session, response) = invite
.accept_with_owner(
peer_device.public_key(),
peer_device.secret_key().to_secret_bytes(),
Some(peer_device.public_key().to_hex()),
Some(peer_owner.public_key()),
)
.expect("peer accepts invite");
let response_event = nostr_double_ratchet_nostr::invite_response_event(&response)
.expect("invite response event");
engine
.observe_invite_response_event(&response_event)
.expect("observe invite response");
let group_id = "claimed-owner-group".to_string();
let snapshot = test_group_snapshot(
&group_id,
"Claimed Owner Group",
peer_owner.public_key(),
vec![peer_owner.public_key(), owner.public_key()],
vec![peer_owner.public_key()],
1,
);
let codec = nostr_double_ratchet_nostr::JsonGroupPayloadCodecV1;
let payload = nostr_double_ratchet::GroupPayloadCodec::encode_pairwise_command(
&codec,
nostr_double_ratchet::GroupPayloadEncodeContext {
local_device_pubkey: ndr_device_pubkey(peer_device.public_key()),
created_at: NdrUnixSeconds(11),
},
&nostr_double_ratchet::GroupPairwiseCommand::MetadataSnapshot { snapshot },
)
.expect("group metadata payload");
let outcome = engine
.process_group_pairwise_payload(
&payload,
peer_device.public_key(),
Some(peer_device.public_key()),
)
.expect("process group payload");
assert!(outcome.consumed);
assert!(outcome.events.is_empty());
assert_eq!(
outcome.queued_targets,
vec![format!("owner:{}", peer_owner.public_key().to_hex())]
);
assert_eq!(
engine.debug_snapshot().pending_group_pairwise_payload_count,
1
);
let batch = engine
.ingest_app_keys_snapshot(
peer_owner.public_key(),
AppKeys::new(vec![DeviceEntry::new(peer_device.public_key(), 12)]),
12,
)
.expect("peer appkeys");
let created = batch
.group_result
.events
.iter()
.find_map(|event| match event {
GroupIncomingEvent::MetadataUpdated(snapshot) if snapshot.group_id == group_id => {
Some(snapshot)
}
_ => None,
})
.expect("group metadata applied after owner claim verification");
assert_eq!(
created.created_by,
ndr_owner_pubkey(peer_owner.public_key())
);
assert_eq!(
engine.debug_snapshot().pending_group_pairwise_payload_count,
0
);
}
#[test]
fn queued_direct_send_schedules_fast_protocol_retry_tick() {
let owner = Keys::generate();
let device = Keys::generate();
let peer = Keys::generate();
let mut core = logged_in_test_core("queued-direct-fast-retry", &owner, &device);
let relay_urls = relay_urls_from_strings(&["wss://relay.invalid".to_string()]);
core.preferences.nostr_relay_urls = vec!["wss://relay.invalid".to_string()];
core.logged_in.as_mut().expect("logged in").relay_urls = relay_urls;
core.send_direct_message(
&peer.public_key().to_hex(),
"queued until app keys arrive",
UnixSeconds(1_777_000_000),
None,
);
let due_at = core
.protocol_subscription_runtime
.liveness_due_at
.expect("queued protocol work should schedule liveness");
assert!(
due_at <= Instant::now() + Duration::from_secs(5),
"queued direct work should schedule a fast retry tick, not wait for the normal liveness interval"
);
}