Skip to main content

iris_chat_core/
lib.rs

1mod actions;
2mod core;
3pub mod desktop_nearby;
4mod emoji;
5pub mod image_proxy;
6pub mod local_relay;
7pub mod perflog;
8mod qr;
9mod state;
10mod test_fixtures;
11pub mod update_policy;
12mod updates;
13
14use std::any::Any;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::sync::{Arc, RwLock};
17use std::thread;
18use std::time::{Duration, SystemTime, UNIX_EPOCH};
19use std::{panic, panic::AssertUnwindSafe};
20
21use flume::{Receiver, Sender};
22
23pub use actions::AppAction;
24pub use emoji::*;
25pub use qr::*;
26pub use state::*;
27pub use test_fixtures::*;
28pub use update_policy::UpdateAutoCheckPolicy;
29pub use updates::*;
30
31use crate::core::AppCore;
32
33uniffi::setup_scaffolding!();
34
35pub(crate) const CORE_RESTART_TOAST: &str = "Iris needs restart. Copy support bundle in Settings.";
36
37fn enqueue_update_for_delivery(
38    update: AppUpdate,
39    latest_full_state: &mut Option<AppUpdate>,
40    before_full_state: &mut Vec<AppUpdate>,
41    after_full_state: &mut Vec<AppUpdate>,
42) {
43    match update {
44        full @ AppUpdate::FullState(_) => *latest_full_state = Some(full),
45        nearby @ AppUpdate::NearbyPublishedEvent { .. } => after_full_state.push(nearby),
46        other => before_full_state.push(other),
47    }
48}
49
50#[uniffi::export(callback_interface)]
51pub trait AppReconciler: Send + Sync + 'static {
52    fn reconcile(&self, update: AppUpdate);
53}
54
55#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
56pub struct DesktopNearbyPeerSnapshot {
57    pub id: String,
58    pub name: String,
59    pub owner_pubkey_hex: Option<String>,
60    pub picture_url: Option<String>,
61    pub profile_event_id: Option<String>,
62    pub last_seen_secs: u64,
63}
64
65#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
66pub struct DesktopNearbySnapshot {
67    pub visible: bool,
68    pub status: String,
69    pub peers: Vec<DesktopNearbyPeerSnapshot>,
70}
71
72#[uniffi::export(callback_interface)]
73pub trait DesktopNearbyObserver: Send + Sync + 'static {
74    fn desktop_nearby_changed(&self, snapshot: DesktopNearbySnapshot);
75}
76
77/// Per-FFI-method call counters that feed the release-gate budget
78/// tests. Every `FfiApp::*` entry point bumps the matching atomic at
79/// the top of the call so a misbehaving shell that re-enters the
80/// core in a hot loop shows up as an obvious counter spike.
81///
82/// We track FFI surface area (not internal core counters) because
83/// the categorical heat bugs we hit have all been "the shell
84/// re-evaluated something and called us N times instead of once" —
85/// e.g. the iOS chat-list search re-firing on every body re-eval.
86/// Adding finer-grained counters inside the core is a future move
87/// if needed; the FFI line is what we can confidently budget today
88/// because each entry corresponds to one observable shell action.
89#[derive(Default, Debug)]
90pub(crate) struct FfiPerfCounters {
91    pub state: AtomicU64,
92    pub dispatch: AtomicU64,
93    pub search: AtomicU64,
94    pub ingest_nearby_event_json: AtomicU64,
95    pub export_support_bundle_json: AtomicU64,
96    pub peer_profile_debug: AtomicU64,
97    pub mutual_groups: AtomicU64,
98    pub prepare_for_suspend: AtomicU64,
99}
100
101#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
102pub struct FfiPerfCountersSnapshot {
103    pub state: u64,
104    pub dispatch: u64,
105    pub search: u64,
106    pub ingest_nearby_event_json: u64,
107    pub export_support_bundle_json: u64,
108    pub peer_profile_debug: u64,
109    pub mutual_groups: u64,
110    pub prepare_for_suspend: u64,
111}
112
113/// Core-internal hot-loop counters. FFI surface counters can only
114/// catch shells that re-enter the core in a loop — the
115/// build_runtime_debug_snapshot regression that caused the macOS CPU
116/// loop was entirely internal (relay events fanned out through
117/// `persist_best_effort_inner` → `persist_debug_snapshot_best_effort`
118/// → full SessionManager clone × N known users). The release-gate
119/// budget tests assert on this snapshot too so the next time core
120/// work explodes per event, CI catches it before a device does.
121#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
122pub struct CorePerfCountersSnapshot {
123    pub debug_snapshot_builds: u64,
124}
125
126#[derive(uniffi::Object)]
127pub struct FfiApp {
128    foreground_tx: Sender<CoreMsg>,
129    foreground_rx: Receiver<CoreMsg>,
130    background_tx: Sender<CoreMsg>,
131    background_rx: Receiver<CoreMsg>,
132    update_rx: Receiver<AppUpdate>,
133    listening: AtomicBool,
134    shared_state: Arc<RwLock<AppState>>,
135    /// Shared SQLite handle used by direct read FFI calls. The core
136    /// supervisor swaps this when it recreates `AppCore` after a panic.
137    shared_db: Arc<RwLock<Option<crate::core::SharedConnection>>>,
138    perf: FfiPerfCounters,
139    queue_metrics: Arc<CoreQueueMetrics>,
140    recovery: Arc<CoreRecoveryState>,
141}
142
143#[derive(Default, Debug)]
144struct CoreQueueMetrics {
145    foreground_processed: AtomicU64,
146    background_processed: AtomicU64,
147    batch_active: AtomicBool,
148    last_batch_started_at_ms: AtomicU64,
149    last_batch_finished_at_ms: AtomicU64,
150    last_batch_size: AtomicU64,
151    last_batch_foreground_count: AtomicU64,
152    last_batch_background_count: AtomicU64,
153}
154
155impl CoreQueueMetrics {
156    fn mark_batch_start(&self, size: u64, foreground: u64, background: u64) {
157        self.last_batch_started_at_ms
158            .store(crate::perflog::now_ms(), Ordering::Relaxed);
159        self.last_batch_size.store(size, Ordering::Relaxed);
160        self.last_batch_foreground_count
161            .store(foreground, Ordering::Relaxed);
162        self.last_batch_background_count
163            .store(background, Ordering::Relaxed);
164        self.batch_active.store(true, Ordering::Release);
165    }
166
167    fn mark_batch_finished(&self, foreground: u64, background: u64) {
168        self.foreground_processed
169            .fetch_add(foreground, Ordering::Relaxed);
170        self.background_processed
171            .fetch_add(background, Ordering::Relaxed);
172        self.last_batch_finished_at_ms
173            .store(crate::perflog::now_ms(), Ordering::Relaxed);
174        self.batch_active.store(false, Ordering::Release);
175    }
176}
177
178#[derive(Default, Debug)]
179struct CoreRecoveryState {
180    restore_action: RwLock<Option<AppAction>>,
181    restart_count: AtomicU64,
182    last_panic: RwLock<Option<String>>,
183}
184
185impl CoreRecoveryState {
186    fn remember_action(&self, action: &AppAction) {
187        match action {
188            AppAction::RestoreSession { .. } | AppAction::RestoreAccountBundle { .. } => {
189                self.set_restore_action(Some(action.clone()));
190            }
191            AppAction::Logout => self.set_restore_action(None),
192            _ => {}
193        }
194    }
195
196    fn remember_update(&self, update: &AppUpdate) {
197        if let AppUpdate::PersistAccountBundle {
198            owner_nsec,
199            owner_pubkey_hex,
200            device_nsec,
201            ..
202        } = update
203        {
204            self.set_restore_action(Some(AppAction::RestoreAccountBundle {
205                owner_nsec: owner_nsec.clone(),
206                owner_pubkey_hex: owner_pubkey_hex.clone(),
207                device_nsec: device_nsec.clone(),
208            }));
209        }
210    }
211
212    fn restore_action(&self) -> Option<AppAction> {
213        match self.restore_action.read() {
214            Ok(action) => action.clone(),
215            Err(poison) => poison.into_inner().clone(),
216        }
217    }
218
219    fn mark_panic(&self, detail: String) -> u64 {
220        match self.last_panic.write() {
221            Ok(mut slot) => *slot = Some(detail),
222            Err(poison) => *poison.into_inner() = Some(detail),
223        }
224        self.restart_count.fetch_add(1, Ordering::Relaxed) + 1
225    }
226
227    fn restart_count(&self) -> u64 {
228        self.restart_count.load(Ordering::Relaxed)
229    }
230
231    fn last_panic(&self) -> Option<String> {
232        match self.last_panic.read() {
233            Ok(slot) => slot.clone(),
234            Err(poison) => poison.into_inner().clone(),
235        }
236    }
237
238    fn set_restore_action(&self, action: Option<AppAction>) {
239        match self.restore_action.write() {
240            Ok(mut slot) => *slot = action,
241            Err(poison) => *poison.into_inner() = action,
242        }
243    }
244}
245
246#[derive(uniffi::Object)]
247pub struct FfiDesktopNearby {
248    service: Arc<desktop_nearby::DesktopNearbyService>,
249}
250
251#[uniffi::export]
252impl FfiApp {
253    #[uniffi::constructor]
254    pub fn new(data_dir: String, _keychain_group: String, _app_version: String) -> Arc<Self> {
255        match panic::catch_unwind(AssertUnwindSafe(|| new_ffi_app_inner(data_dir))) {
256            Ok(app) => app,
257            Err(payload) => ffi_app_failure(format!(
258                "Iris could not start: {}",
259                panic_payload_to_string(payload)
260            )),
261        }
262    }
263
264    pub fn state(&self) -> AppState {
265        self.perf.state.fetch_add(1, Ordering::Relaxed);
266        ffi_or("ffiapp.state", ffi_failure_state(), || {
267            match self.shared_state.read() {
268                Ok(slot) => slot.clone(),
269                Err(poison) => poison.into_inner().clone(),
270            }
271        })
272    }
273
274    pub fn dispatch(&self, action: AppAction) {
275        self.perf.dispatch.fetch_add(1, Ordering::Relaxed);
276        ffi_or("ffiapp.dispatch", (), || {
277            crate::perflog!("ffi.dispatch action={:?}", std::mem::discriminant(&action));
278            self.recovery.remember_action(&action);
279            let _ = self.foreground_tx.send(CoreMsg::Action(action));
280        })
281    }
282
283    /// Snapshot of the per-FFI-method call counts since `FfiApp` was
284    /// created. Used by `tests/perf_budgets.rs` to assert that hot
285    /// shell paths stay within their expected FFI traffic. Reading
286    /// is best-effort `Relaxed` — call counts are advisory not
287    /// transactional.
288    pub fn perf_counters(&self) -> FfiPerfCountersSnapshot {
289        FfiPerfCountersSnapshot {
290            state: self.perf.state.load(Ordering::Relaxed),
291            dispatch: self.perf.dispatch.load(Ordering::Relaxed),
292            search: self.perf.search.load(Ordering::Relaxed),
293            ingest_nearby_event_json: self.perf.ingest_nearby_event_json.load(Ordering::Relaxed),
294            export_support_bundle_json: self
295                .perf
296                .export_support_bundle_json
297                .load(Ordering::Relaxed),
298            peer_profile_debug: self.perf.peer_profile_debug.load(Ordering::Relaxed),
299            mutual_groups: self.perf.mutual_groups.load(Ordering::Relaxed),
300            prepare_for_suspend: self.perf.prepare_for_suspend.load(Ordering::Relaxed),
301        }
302    }
303
304    /// Snapshot of core-internal hot-loop counters. Used by
305    /// `tests/perf_budgets.rs` to budget work that happens entirely
306    /// inside the core thread — the FFI surface counters above can't
307    /// see those. Default snapshot on timeout so a wedged core can't
308    /// pin the test on a perpetual wait.
309    pub fn core_perf_counters(&self) -> CorePerfCountersSnapshot {
310        ffi_or(
311            "ffiapp.core_perf_counters",
312            CorePerfCountersSnapshot::default(),
313            || {
314                let (reply_tx, reply_rx) = flume::bounded(1);
315                if self
316                    .foreground_tx
317                    .send(CoreMsg::CorePerfCounters(reply_tx))
318                    .is_err()
319                {
320                    return CorePerfCountersSnapshot::default();
321                }
322                match reply_rx.recv_timeout(Duration::from_secs(2)) {
323                    Ok(snapshot) => CorePerfCountersSnapshot {
324                        debug_snapshot_builds: snapshot.debug_snapshot_builds,
325                    },
326                    Err(_) => CorePerfCountersSnapshot::default(),
327                }
328            },
329        )
330    }
331
332    /// Grouped Signal-style search: filters the in-memory chat list
333    /// into contacts/groups by display name + subtitle + chat id, and
334    /// runs the SQLite FTS5 index for the messages section. Optional
335    /// `scope_chat_id` restricts message hits to a single thread (the
336    /// "search in this chat" pill in the desktop sidebar). Returns an
337    /// empty snapshot for empty / whitespace queries.
338    pub fn search(
339        &self,
340        query: String,
341        scope_chat_id: Option<String>,
342        limit: u32,
343    ) -> SearchResultSnapshot {
344        self.perf.search.fetch_add(1, Ordering::Relaxed);
345        ffi_or(
346            "ffiapp.search",
347            SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone()),
348            || {
349                let trimmed = query.trim();
350                if trimmed.is_empty() {
351                    return SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone());
352                }
353                let limit = limit.max(1) as usize;
354                let state_snapshot = match self.shared_state.read() {
355                    Ok(slot) => slot.clone(),
356                    Err(poison) => poison.into_inner().clone(),
357                };
358                let (contacts, groups) = if scope_chat_id.is_some() {
359                    (Vec::new(), Vec::new())
360                } else {
361                    filter_threads_for_search(&state_snapshot.chat_list, trimmed)
362                };
363                let shared_db = self.shared_db_snapshot();
364                let messages = match shared_db.as_ref() {
365                    Some(shared) => match shared.lock() {
366                        Ok(conn) => crate::core::search_messages_fts(
367                            &conn,
368                            trimmed,
369                            scope_chat_id.as_deref(),
370                            limit,
371                        )
372                        .unwrap_or_default(),
373                        Err(poison) => crate::core::search_messages_fts(
374                            &poison.into_inner(),
375                            trimmed,
376                            scope_chat_id.as_deref(),
377                            limit,
378                        )
379                        .unwrap_or_default(),
380                    },
381                    None => Vec::new(),
382                };
383                let enriched = enrich_message_hits(messages, &state_snapshot.chat_list);
384                // The shortcut row only makes sense for global search.
385                // Once the user has scoped to a single chat, an npub
386                // paste should still search that chat's messages, not
387                // jump out of the scope.
388                let shortcut = if scope_chat_id.is_none() {
389                    chat_input_shortcut(trimmed)
390                } else {
391                    None
392                };
393                SearchResultSnapshot {
394                    query,
395                    scope_chat_id,
396                    contacts,
397                    groups,
398                    messages: enriched,
399                    shortcut,
400                }
401            },
402        )
403    }
404
405    /// Bounded chat projection for a route-selected chat. Unlike
406    /// `OpenChat`, this is a direct read from the shared state/SQLite
407    /// handle and never waits behind the core action queue. Shells use
408    /// it as the first paint for chat screens; the core still receives
409    /// `OpenChat` for unread clearing, subscriptions, and side effects.
410    pub fn chat_snapshot(&self, chat_id: String, limit: u32) -> Option<CurrentChatSnapshot> {
411        ffi_or("ffiapp.chat_snapshot", None, || {
412            let state_snapshot = match self.shared_state.read() {
413                Ok(slot) => slot.clone(),
414                Err(poison) => poison.into_inner().clone(),
415            };
416            crate::core::chat_snapshot_from_state_and_db(
417                &state_snapshot,
418                self.shared_db_snapshot().as_ref(),
419                &chat_id,
420                limit.max(1) as usize,
421            )
422        })
423    }
424
425    pub fn chat_snapshot_before(
426        &self,
427        chat_id: String,
428        before_message_id: String,
429        limit: u32,
430    ) -> Option<CurrentChatSnapshot> {
431        ffi_or("ffiapp.chat_snapshot_before", None, || {
432            let state_snapshot = match self.shared_state.read() {
433                Ok(slot) => slot.clone(),
434                Err(poison) => poison.into_inner().clone(),
435            };
436            crate::core::chat_snapshot_before_from_state_and_db(
437                &state_snapshot,
438                self.shared_db_snapshot().as_ref(),
439                &chat_id,
440                &before_message_id,
441                limit.max(1) as usize,
442            )
443        })
444    }
445
446    pub fn chat_snapshot_around_message(
447        &self,
448        chat_id: String,
449        message_id: String,
450        before_limit: u32,
451        after_limit: u32,
452    ) -> Option<CurrentChatSnapshot> {
453        ffi_or("ffiapp.chat_snapshot_around_message", None, || {
454            let state_snapshot = match self.shared_state.read() {
455                Ok(slot) => slot.clone(),
456                Err(poison) => poison.into_inner().clone(),
457            };
458            crate::core::chat_snapshot_around_message_from_state_and_db(
459                &state_snapshot,
460                self.shared_db_snapshot().as_ref(),
461                &chat_id,
462                &message_id,
463                before_limit as usize,
464                after_limit as usize,
465            )
466        })
467    }
468
469    pub fn ingest_nearby_event_json(&self, event_json: String) -> bool {
470        self.perf
471            .ingest_nearby_event_json
472            .fetch_add(1, Ordering::Relaxed);
473        self.ingest_nearby_event_json_with_transport(event_json, String::new())
474    }
475
476    pub fn ingest_nearby_event_json_with_transport(
477        &self,
478        event_json: String,
479        transport: String,
480    ) -> bool {
481        ffi_or("ffiapp.ingest_nearby_event_json", false, || {
482            let event = match serde_json::from_str::<nostr_sdk::prelude::Event>(&event_json) {
483                Ok(event) => event,
484                Err(_) => return false,
485            };
486            if event.verify().is_err() {
487                return false;
488            }
489            self.background_tx
490                .send(CoreMsg::Internal(Box::new(InternalEvent::NearbyEvent {
491                    event,
492                    transport,
493                })))
494                .is_ok()
495        })
496    }
497
498    pub fn build_nearby_presence_event_json(
499        &self,
500        peer_id: String,
501        my_nonce: String,
502        their_nonce: String,
503        profile_event_id: String,
504    ) -> String {
505        ffi_or(
506            "ffiapp.build_nearby_presence_event_json",
507            String::new(),
508            || {
509                let (reply_tx, reply_rx) = flume::bounded(1);
510                if self
511                    .background_tx
512                    .send(CoreMsg::BuildNearbyPresenceEvent {
513                        peer_id,
514                        my_nonce,
515                        their_nonce,
516                        profile_event_id,
517                        reply_tx,
518                    })
519                    .is_err()
520                {
521                    return String::new();
522                }
523                reply_rx
524                    .recv_timeout(Duration::from_secs(2))
525                    .unwrap_or_default()
526            },
527        )
528    }
529
530    pub fn verify_nearby_presence_event_json(
531        &self,
532        event_json: String,
533        peer_id: String,
534        my_nonce: String,
535        their_nonce: String,
536    ) -> String {
537        ffi_or(
538            "ffiapp.verify_nearby_presence_event_json",
539            String::new(),
540            || verify_nearby_presence_event_json(&event_json, &peer_id, &my_nonce, &their_nonce),
541        )
542    }
543
544    pub fn nearby_encode_frame(&self, envelope_json: String) -> Vec<u8> {
545        ffi_or("ffiapp.nearby_encode_frame", Vec::new(), || {
546            nostr_double_ratchet_runtime::encode_nearby_frame_json(&envelope_json)
547                .unwrap_or_default()
548        })
549    }
550
551    pub fn nearby_decode_frame(&self, frame: Vec<u8>) -> String {
552        ffi_or("ffiapp.nearby_decode_frame", String::new(), || {
553            nostr_double_ratchet_runtime::decode_nearby_frame_json(&frame).unwrap_or_default()
554        })
555    }
556
557    pub fn nearby_frame_body_len_from_header(&self, header: Vec<u8>) -> i32 {
558        ffi_or("ffiapp.nearby_frame_body_len_from_header", -1, || {
559            nostr_double_ratchet_runtime::nearby_frame_body_len_from_header(&header)
560                .and_then(|len| i32::try_from(len).ok())
561                .unwrap_or(-1)
562        })
563    }
564
565    pub fn export_support_bundle_json(&self) -> String {
566        self.perf
567            .export_support_bundle_json
568            .fetch_add(1, Ordering::Relaxed);
569        ffi_or(
570            "ffiapp.export_support_bundle_json",
571            self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
572            || {
573                let (reply_tx, reply_rx) = flume::bounded(1);
574                if self
575                    .foreground_tx
576                    .send(CoreMsg::ExportSupportBundle(reply_tx))
577                    .is_err()
578                {
579                    return self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true);
580                }
581                match reply_rx.recv_timeout(Duration::from_secs(2)) {
582                    Ok(json) => self.support_bundle_json_with_ffi_diagnostics(json, false),
583                    Err(_) => self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
584                }
585            },
586        )
587    }
588
589    pub fn peer_profile_debug(&self, owner_input: String) -> Option<PeerProfileDebugSnapshot> {
590        self.perf.peer_profile_debug.fetch_add(1, Ordering::Relaxed);
591        ffi_or("ffiapp.peer_profile_debug", None, || {
592            let (reply_tx, reply_rx) = flume::bounded(1);
593            if self
594                .foreground_tx
595                .send(CoreMsg::PeerProfileDebug {
596                    owner_input,
597                    reply_tx,
598                })
599                .is_err()
600            {
601                return None;
602            }
603            reply_rx.recv_timeout(Duration::from_secs(2)).ok().flatten()
604        })
605    }
606
607    pub fn mutual_groups(&self, owner_input: String) -> MutualGroupsSnapshot {
608        self.perf.mutual_groups.fetch_add(1, Ordering::Relaxed);
609        ffi_or(
610            "ffiapp.mutual_groups",
611            MutualGroupsSnapshot::default(),
612            || {
613                let (reply_tx, reply_rx) = flume::bounded(1);
614                if self
615                    .foreground_tx
616                    .send(CoreMsg::MutualGroups {
617                        owner_input,
618                        reply_tx,
619                    })
620                    .is_err()
621                {
622                    return MutualGroupsSnapshot::default();
623                }
624                reply_rx
625                    .recv_timeout(Duration::from_secs(2))
626                    .unwrap_or_default()
627            },
628        )
629    }
630
631    pub fn prepare_for_suspend(&self) {
632        self.perf
633            .prepare_for_suspend
634            .fetch_add(1, Ordering::Relaxed);
635        ffi_or("ffiapp.prepare_for_suspend", (), || {
636            let (reply_tx, reply_rx) = flume::bounded(1);
637            if self
638                .foreground_tx
639                .send(CoreMsg::PrepareForSuspend(reply_tx))
640                .is_err()
641            {
642                return;
643            }
644            let _ = reply_rx.recv_timeout(Duration::from_secs(2));
645        })
646    }
647
648    pub fn shutdown(&self) {
649        ffi_or("ffiapp.shutdown", (), || {
650            let (reply_tx, reply_rx) = flume::bounded(1);
651            if self
652                .foreground_tx
653                .send(CoreMsg::Shutdown(Some(reply_tx)))
654                .is_err()
655            {
656                return;
657            }
658            let _ = reply_rx.recv_timeout(Duration::from_secs(2));
659        })
660    }
661
662    fn support_bundle_json_with_ffi_diagnostics(
663        &self,
664        rust_json: String,
665        core_support_bundle_timed_out: bool,
666    ) -> String {
667        let mut object = serde_json::from_str::<serde_json::Value>(&rust_json)
668            .ok()
669            .and_then(|value| value.as_object().cloned())
670            .unwrap_or_default();
671        let now_ms = crate::perflog::now_ms();
672        let last_started_at_ms = self
673            .queue_metrics
674            .last_batch_started_at_ms
675            .load(Ordering::Relaxed);
676        let last_finished_at_ms = self
677            .queue_metrics
678            .last_batch_finished_at_ms
679            .load(Ordering::Relaxed);
680        let batch_active = self.queue_metrics.batch_active.load(Ordering::Acquire);
681        let active_batch_age_ms = if batch_active && last_started_at_ms > 0 {
682            Some(now_ms.saturating_sub(last_started_at_ms))
683        } else {
684            None
685        };
686        let last_batch_started_ago_ms = if last_started_at_ms > 0 {
687            Some(now_ms.saturating_sub(last_started_at_ms))
688        } else {
689            None
690        };
691        let last_batch_finished_ago_ms = if last_finished_at_ms > 0 {
692            Some(now_ms.saturating_sub(last_finished_at_ms))
693        } else {
694            None
695        };
696        object.insert(
697            "ffi_queue".to_string(),
698            serde_json::json!({
699                "core_support_bundle_timed_out": core_support_bundle_timed_out,
700                "foreground_pending": self.foreground_rx.len(),
701                "background_pending": self.background_rx.len(),
702                "foreground_processed": self.queue_metrics.foreground_processed.load(Ordering::Relaxed),
703                "background_processed": self.queue_metrics.background_processed.load(Ordering::Relaxed),
704                "batch_active": batch_active,
705                "active_batch_age_ms": active_batch_age_ms,
706                "last_batch_started_ago_ms": last_batch_started_ago_ms,
707                "last_batch_finished_ago_ms": last_batch_finished_ago_ms,
708                "last_batch_size": self.queue_metrics.last_batch_size.load(Ordering::Relaxed),
709                "last_batch_foreground_count": self.queue_metrics.last_batch_foreground_count.load(Ordering::Relaxed),
710                "last_batch_background_count": self.queue_metrics.last_batch_background_count.load(Ordering::Relaxed),
711                "core_restarts": self.recovery.restart_count(),
712                "last_core_panic": self.recovery.last_panic(),
713                "has_cached_restore_action": self.recovery.restore_action().is_some(),
714            }),
715        );
716        serde_json::Value::Object(object).to_string()
717    }
718
719    pub fn listen_for_updates(&self, reconciler: Box<dyn AppReconciler>) {
720        if self
721            .listening
722            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
723            .is_err()
724        {
725            return;
726        }
727
728        let update_rx = self.update_rx.clone();
729        let recovery = self.recovery.clone();
730        let spawn_result = thread::Builder::new()
731            .name("iris-updates".to_string())
732            .spawn(move || {
733                // Drain queued updates and deliver the latest FullState only.
734                // The shell side already discards FullStates with stale `rev`,
735                // but the JNI marshal of an AppState is itself ~20-30 ms and
736                // each push triggers a full Compose recomposition (~400 ms on
737                // Android debug). When the core emits a tight burst of 3-4
738                // updates (OpenChat → SyncComplete → FetchCatchUpEvents → …)
739                // the UI keeps re-rendering for seconds even though only the
740                // final state mattered.
741                //
742                // PersistAccountBundle is a side-effect (key persistence), not
743                // a UI update, so we never collapse those — every one must run.
744                while let Ok(first) = update_rx.recv() {
745                    let mut latest_full_state: Option<AppUpdate> = None;
746                    let mut before_full_state: Vec<AppUpdate> = Vec::new();
747                    let mut after_full_state: Vec<AppUpdate> = Vec::new();
748                    let process = |update: AppUpdate,
749                                   latest: &mut Option<AppUpdate>,
750                                   before: &mut Vec<AppUpdate>,
751                                   after: &mut Vec<AppUpdate>| {
752                        recovery.remember_update(&update);
753                        enqueue_update_for_delivery(update, latest, before, after);
754                    };
755                    process(
756                        first,
757                        &mut latest_full_state,
758                        &mut before_full_state,
759                        &mut after_full_state,
760                    );
761                    while let Ok(next) = update_rx.try_recv() {
762                        process(
763                            next,
764                            &mut latest_full_state,
765                            &mut before_full_state,
766                            &mut after_full_state,
767                        );
768                    }
769                    for update in before_full_state
770                        .into_iter()
771                        .chain(latest_full_state)
772                        .chain(after_full_state)
773                    {
774                        let kind = match &update {
775                            AppUpdate::FullState(_) => "FullState",
776                            AppUpdate::PersistAccountBundle { .. } => "PersistAccountBundle",
777                            AppUpdate::NearbyPublishedEvent { .. } => "NearbyPublishedEvent",
778                        };
779                        let t0 = crate::perflog::now_ms();
780                        crate::perflog!("reconcile.start kind={kind}");
781                        if panic::catch_unwind(AssertUnwindSafe(|| reconciler.reconcile(update)))
782                            .is_err()
783                        {
784                            crate::perflog!("reconcile.failed kind={kind}");
785                            continue;
786                        }
787                        crate::perflog!(
788                            "reconcile.end kind={kind} elapsed_ms={}",
789                            crate::perflog::now_ms().saturating_sub(t0)
790                        );
791                    }
792                }
793            });
794        if let Err(error) = spawn_result {
795            crate::perflog!("updates.spawn.failed error={error}");
796            self.listening.store(false, Ordering::SeqCst);
797        }
798    }
799}
800
801impl FfiApp {
802    fn shared_db_snapshot(&self) -> Option<crate::core::SharedConnection> {
803        match self.shared_db.read() {
804            Ok(slot) => slot.clone(),
805            Err(poison) => poison.into_inner().clone(),
806        }
807    }
808}
809
810#[uniffi::export]
811impl FfiDesktopNearby {
812    #[uniffi::constructor]
813    pub fn new(app: Arc<FfiApp>, observer: Box<dyn DesktopNearbyObserver>) -> Arc<Self> {
814        Arc::new(Self {
815            service: desktop_nearby::DesktopNearbyService::new(app, observer.into()),
816        })
817    }
818
819    pub fn start(&self, local_name: String) {
820        self.service.start(local_name);
821    }
822
823    pub fn stop(&self) {
824        self.service.stop();
825    }
826
827    pub fn snapshot(&self) -> DesktopNearbySnapshot {
828        self.service.snapshot()
829    }
830
831    pub fn publish(&self, event_id: String, kind: u32, created_at_secs: u64, event_json: String) {
832        self.service
833            .publish(event_id, kind, created_at_secs, event_json);
834    }
835}
836
837fn new_ffi_app_inner(data_dir: String) -> Arc<FfiApp> {
838    let (update_tx, update_rx) = flume::unbounded();
839    let (foreground_tx, foreground_rx) = flume::unbounded();
840    let (background_tx, background_rx) = flume::unbounded();
841    let shared_state = Arc::new(RwLock::new(AppState::empty()));
842    let queue_metrics = Arc::new(CoreQueueMetrics::default());
843    let recovery = Arc::new(CoreRecoveryState::default());
844    let shared_db = Arc::new(RwLock::new(None));
845
846    let update_tx_for_error = update_tx.clone();
847    match AppCore::try_new(
848        update_tx.clone(),
849        background_tx.clone(),
850        data_dir.clone(),
851        shared_state.clone(),
852    ) {
853        Ok(core) => {
854            set_shared_db(&shared_db, Some(core.shared_db()));
855            let spawn_result = spawn_core_supervisor(
856                core,
857                CoreSupervisor {
858                    data_dir,
859                    update_tx: update_tx.clone(),
860                    core_sender: background_tx.clone(),
861                    foreground_rx: foreground_rx.clone(),
862                    background_rx: background_rx.clone(),
863                    shared_state: shared_state.clone(),
864                    shared_db: shared_db.clone(),
865                    queue_metrics: queue_metrics.clone(),
866                    recovery: recovery.clone(),
867                },
868            );
869            if let Err(error) = spawn_result {
870                publish_core_failure_state(
871                    &shared_state,
872                    &update_tx_for_error,
873                    format!("Iris could not start: {error}"),
874                );
875            }
876        }
877        Err(error) => {
878            publish_core_failure_state(&shared_state, &update_tx_for_error, error.to_string());
879        }
880    }
881
882    Arc::new(FfiApp {
883        foreground_tx,
884        foreground_rx,
885        background_tx,
886        background_rx,
887        update_rx,
888        listening: AtomicBool::new(false),
889        shared_state,
890        shared_db,
891        perf: FfiPerfCounters::default(),
892        queue_metrics,
893        recovery,
894    })
895}
896
897fn ffi_app_failure(message: String) -> Arc<FfiApp> {
898    let (_update_tx, update_rx) = flume::unbounded();
899    let (foreground_tx, foreground_rx) = flume::unbounded();
900    let (background_tx, background_rx) = flume::unbounded();
901    let mut state = AppState::empty();
902    state.toast = Some(message);
903    state.rev = 1;
904    let shared_state = Arc::new(RwLock::new(state));
905    Arc::new(FfiApp {
906        foreground_tx,
907        foreground_rx,
908        background_tx,
909        background_rx,
910        update_rx,
911        listening: AtomicBool::new(false),
912        shared_state,
913        shared_db: Arc::new(RwLock::new(None)),
914        perf: FfiPerfCounters::default(),
915        queue_metrics: Arc::new(CoreQueueMetrics::default()),
916        recovery: Arc::new(CoreRecoveryState::default()),
917    })
918}
919
920struct CoreSupervisor {
921    data_dir: String,
922    update_tx: Sender<AppUpdate>,
923    core_sender: Sender<CoreMsg>,
924    foreground_rx: Receiver<CoreMsg>,
925    background_rx: Receiver<CoreMsg>,
926    shared_state: Arc<RwLock<AppState>>,
927    shared_db: Arc<RwLock<Option<crate::core::SharedConnection>>>,
928    queue_metrics: Arc<CoreQueueMetrics>,
929    recovery: Arc<CoreRecoveryState>,
930}
931
932fn spawn_core_supervisor(
933    core: AppCore,
934    supervisor: CoreSupervisor,
935) -> std::io::Result<thread::JoinHandle<()>> {
936    thread::Builder::new()
937        .name("iris-core".to_string())
938        .spawn(move || {
939            let mut core_slot = Some(core);
940            // User actions and synchronous shell requests must not sit behind
941            // relay/nearby backlog. The core keeps internal work on a
942            // background queue and drains it in bounded chunks between
943            // foreground batches.
944            while let Ok(batch) =
945                recv_core_batch(&supervisor.foreground_rx, &supervisor.background_rx)
946            {
947                let batch_size = batch.len();
948                let foreground_count = batch
949                    .iter()
950                    .filter(|msg| is_foreground_core_msg(msg))
951                    .count() as u64;
952                let background_count = batch_size as u64 - foreground_count;
953                supervisor.queue_metrics.mark_batch_start(
954                    batch_size as u64,
955                    foreground_count,
956                    background_count,
957                );
958                let t0 = crate::perflog::now_ms();
959                crate::perflog!("core.batch.start size={batch_size}");
960                let result = match core_slot.as_mut() {
961                    Some(core) => catch_core_batch(|| handle_core_batch_responsive(core, batch)),
962                    None => break,
963                };
964                let elapsed_ms = crate::perflog::now_ms().saturating_sub(t0);
965                supervisor
966                    .queue_metrics
967                    .mark_batch_finished(foreground_count, background_count);
968                match result {
969                    Ok(true) => {
970                        crate::perflog!(
971                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms}"
972                        );
973                    }
974                    Ok(false) => {
975                        crate::perflog!(
976                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms} result=shutdown"
977                        );
978                        break;
979                    }
980                    Err(error) => {
981                        if let Some(mut failed_core) = core_slot.take() {
982                            failed_core.record_core_panic(error.clone());
983                        }
984                        crate::perflog!(
985                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms} result=panic"
986                        );
987                        match recover_core_after_panic(&supervisor, error) {
988                            Some(core) => core_slot = Some(core),
989                            None => break,
990                        }
991                    }
992                }
993            }
994        })
995}
996
997fn recover_core_after_panic(supervisor: &CoreSupervisor, detail: String) -> Option<AppCore> {
998    let restart_count = supervisor.recovery.mark_panic(detail);
999    crate::perflog!("core.supervisor.restart count={restart_count}");
1000    set_shared_db(&supervisor.shared_db, None);
1001
1002    let mut core = match AppCore::try_new(
1003        supervisor.update_tx.clone(),
1004        supervisor.core_sender.clone(),
1005        supervisor.data_dir.clone(),
1006        supervisor.shared_state.clone(),
1007    ) {
1008        Ok(core) => core,
1009        Err(error) => {
1010            crate::perflog!("core.supervisor.restart.failed count={restart_count} error={error}");
1011            publish_core_failure_state(
1012                &supervisor.shared_state,
1013                &supervisor.update_tx,
1014                CORE_RESTART_TOAST.to_string(),
1015            );
1016            return None;
1017        }
1018    };
1019
1020    set_shared_db(&supervisor.shared_db, Some(core.shared_db()));
1021    if let Some(action) = supervisor.recovery.restore_action() {
1022        crate::perflog!(
1023            "core.supervisor.restore action={:?}",
1024            std::mem::discriminant(&action)
1025        );
1026        match catch_core_batch(|| core.handle_messages(vec![CoreMsg::Action(action)])) {
1027            Ok(true) => {}
1028            Ok(false) => {
1029                publish_core_failure_state(
1030                    &supervisor.shared_state,
1031                    &supervisor.update_tx,
1032                    CORE_RESTART_TOAST.to_string(),
1033                );
1034                return None;
1035            }
1036            Err(error) => {
1037                core.mark_core_panic(format!("core recovery restore panic: {error}"));
1038                return None;
1039            }
1040        }
1041    }
1042
1043    crate::perflog!("core.supervisor.recovered count={restart_count}");
1044    Some(core)
1045}
1046
1047fn set_shared_db(
1048    shared_db: &Arc<RwLock<Option<crate::core::SharedConnection>>>,
1049    value: Option<crate::core::SharedConnection>,
1050) {
1051    match shared_db.write() {
1052        Ok(mut slot) => *slot = value,
1053        Err(poison) => *poison.into_inner() = value,
1054    }
1055}
1056
1057fn publish_core_failure_state(
1058    shared_state: &Arc<RwLock<AppState>>,
1059    update_tx: &Sender<AppUpdate>,
1060    message: String,
1061) {
1062    let mut state = match shared_state.read() {
1063        Ok(slot) => slot.clone(),
1064        Err(poison) => poison.into_inner().clone(),
1065    };
1066    state.toast = Some(message);
1067    state.rev = state.rev.saturating_add(1).max(1);
1068    match shared_state.write() {
1069        Ok(mut slot) => *slot = state.clone(),
1070        Err(poison) => *poison.into_inner() = state.clone(),
1071    }
1072    let _ = update_tx.send(AppUpdate::FullState(state));
1073}
1074
1075const CORE_FOREGROUND_BATCH_LIMIT: usize = 64;
1076const CORE_BACKGROUND_BATCH_LIMIT: usize = 16;
1077
1078fn recv_core_batch(
1079    foreground_rx: &Receiver<CoreMsg>,
1080    background_rx: &Receiver<CoreMsg>,
1081) -> Result<Vec<CoreMsg>, flume::RecvError> {
1082    if let Some(batch) = try_recv_core_batch(foreground_rx, background_rx) {
1083        return Ok(batch);
1084    }
1085
1086    let (is_foreground, first) = flume::Selector::new()
1087        .recv(foreground_rx, |result| result.map(|msg| (true, msg)))
1088        .recv(background_rx, |result| result.map(|msg| (false, msg)))
1089        .wait()?;
1090    Ok(drain_core_batch_after_first(
1091        is_foreground,
1092        first,
1093        foreground_rx,
1094        background_rx,
1095    ))
1096}
1097
1098fn try_recv_core_batch(
1099    foreground_rx: &Receiver<CoreMsg>,
1100    background_rx: &Receiver<CoreMsg>,
1101) -> Option<Vec<CoreMsg>> {
1102    if let Ok(first) = foreground_rx.try_recv() {
1103        return Some(drain_core_batch_after_first(
1104            true,
1105            first,
1106            foreground_rx,
1107            background_rx,
1108        ));
1109    }
1110    background_rx
1111        .try_recv()
1112        .ok()
1113        .map(|first| drain_core_batch_after_first(false, first, foreground_rx, background_rx))
1114}
1115
1116fn drain_core_batch_after_first(
1117    is_foreground: bool,
1118    first: CoreMsg,
1119    foreground_rx: &Receiver<CoreMsg>,
1120    background_rx: &Receiver<CoreMsg>,
1121) -> Vec<CoreMsg> {
1122    let mut batch = Vec::with_capacity(if is_foreground {
1123        CORE_FOREGROUND_BATCH_LIMIT
1124    } else {
1125        CORE_BACKGROUND_BATCH_LIMIT
1126    });
1127    batch.push(first);
1128    drain_foreground_messages(&mut batch, foreground_rx);
1129    if batch.iter().any(is_foreground_core_msg) {
1130        return batch;
1131    }
1132
1133    while batch.len() < CORE_BACKGROUND_BATCH_LIMIT {
1134        let Ok(next) = background_rx.try_recv() else {
1135            break;
1136        };
1137        batch.push(next);
1138        drain_foreground_messages(&mut batch, foreground_rx);
1139        if batch.iter().any(is_foreground_core_msg) {
1140            break;
1141        }
1142    }
1143    batch
1144}
1145
1146fn drain_foreground_messages(batch: &mut Vec<CoreMsg>, foreground_rx: &Receiver<CoreMsg>) {
1147    while batch.len() < CORE_FOREGROUND_BATCH_LIMIT {
1148        let Ok(next) = foreground_rx.try_recv() else {
1149            break;
1150        };
1151        batch.push(next);
1152    }
1153}
1154
1155fn handle_core_batch_responsive(core: &mut AppCore, messages: Vec<CoreMsg>) -> bool {
1156    if messages.len() <= 1 || !messages.iter().any(is_foreground_core_msg) {
1157        return core.handle_messages(messages);
1158    }
1159
1160    let mut foreground = Vec::new();
1161    let mut background = Vec::new();
1162    for message in messages {
1163        if is_foreground_core_msg(&message) {
1164            foreground.push(message);
1165        } else {
1166            background.push(message);
1167        }
1168    }
1169
1170    // Each foreground message runs in its own batch — the user gets immediate
1171    // feedback per action, but a single action that cascades into multiple
1172    // engine.persist() calls (e.g. send → retry_pending_protocol → another
1173    // persist) coalesces them into one SQLite write.
1174    for message in foreground {
1175        if !core.handle_messages(vec![message]) {
1176            return false;
1177        }
1178    }
1179    background.is_empty() || core.handle_messages(background)
1180}
1181
1182fn catch_core_batch<F>(f: F) -> Result<bool, String>
1183where
1184    F: FnOnce() -> bool,
1185{
1186    panic::catch_unwind(AssertUnwindSafe(f)).map_err(panic_payload_to_string)
1187}
1188
1189fn ffi_or<T, F>(label: &'static str, fallback: T, f: F) -> T
1190where
1191    F: FnOnce() -> T,
1192{
1193    match panic::catch_unwind(AssertUnwindSafe(f)) {
1194        Ok(value) => value,
1195        Err(payload) => {
1196            crate::perflog!(
1197                "ffi.panic label={label} detail={}",
1198                panic_payload_to_string(payload)
1199            );
1200            fallback
1201        }
1202    }
1203}
1204
1205fn ffi_failure_state() -> AppState {
1206    let mut state = AppState::empty();
1207    state.toast = Some("Iris needs restart. Copy support bundle in Settings.".to_string());
1208    state
1209}
1210
1211fn suppressed_mobile_push_resolution() -> MobilePushNotificationResolution {
1212    MobilePushNotificationResolution {
1213        should_show: false,
1214        title: String::new(),
1215        body: String::new(),
1216        payload_json: "{}".to_string(),
1217    }
1218}
1219
1220fn panic_payload_to_string(payload: Box<dyn Any + Send>) -> String {
1221    if let Some(message) = payload.downcast_ref::<&str>() {
1222        (*message).to_string()
1223    } else if let Some(message) = payload.downcast_ref::<String>() {
1224        message.clone()
1225    } else {
1226        "unknown panic".to_string()
1227    }
1228}
1229
1230fn is_foreground_core_msg(message: &CoreMsg) -> bool {
1231    !matches!(message, CoreMsg::Internal(_))
1232}
1233
1234#[cfg(test)]
1235mod core_queue_tests {
1236    use super::*;
1237
1238    fn background_msg(index: usize) -> CoreMsg {
1239        CoreMsg::Internal(Box::new(InternalEvent::DebugLog {
1240            category: "test.background".to_string(),
1241            detail: index.to_string(),
1242        }))
1243    }
1244
1245    #[test]
1246    fn foreground_queue_preempts_background_backlog() {
1247        let (foreground_tx, foreground_rx) = flume::unbounded();
1248        let (background_tx, background_rx) = flume::unbounded();
1249        for index in 0..100 {
1250            background_tx.send(background_msg(index)).unwrap();
1251        }
1252        foreground_tx
1253            .send(CoreMsg::Action(AppAction::NavigateBack))
1254            .unwrap();
1255
1256        let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
1257
1258        assert!(matches!(
1259            batch.first(),
1260            Some(CoreMsg::Action(AppAction::NavigateBack))
1261        ));
1262        assert!(
1263            batch.iter().all(is_foreground_core_msg),
1264            "foreground work should not be bundled behind background backlog"
1265        );
1266    }
1267
1268    #[test]
1269    fn background_queue_drains_in_bounded_chunks() {
1270        let (_foreground_tx, foreground_rx) = flume::unbounded();
1271        let (background_tx, background_rx) = flume::unbounded();
1272        for index in 0..100 {
1273            background_tx.send(background_msg(index)).unwrap();
1274        }
1275
1276        let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
1277
1278        assert_eq!(batch.len(), CORE_BACKGROUND_BATCH_LIMIT);
1279        assert!(batch.iter().all(|msg| !is_foreground_core_msg(msg)));
1280    }
1281
1282    #[test]
1283    fn route_chat_snapshot_uses_chat_list_without_core_queue() {
1284        let state = build_large_test_app_state(80, 20, 1_200);
1285        let chat_id = state.chat_list[10].chat_id.clone();
1286
1287        let snapshot =
1288            crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).unwrap();
1289
1290        assert_eq!(snapshot.chat_id, chat_id);
1291        assert_eq!(snapshot.display_name, state.chat_list[10].display_name);
1292        assert!(snapshot.messages.is_empty());
1293    }
1294
1295    #[test]
1296    fn route_chat_snapshot_requires_account() {
1297        let mut state = build_large_test_app_state(80, 20, 1_200);
1298        state.account = None;
1299        let chat_id = state.chat_list[10].chat_id.clone();
1300
1301        assert!(crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).is_none());
1302    }
1303}
1304
1305fn filter_threads_for_search(
1306    chat_list: &[ChatThreadSnapshot],
1307    query: &str,
1308) -> (Vec<ChatThreadSnapshot>, Vec<ChatThreadSnapshot>) {
1309    let needle = query.to_lowercase();
1310    let mut contacts = Vec::new();
1311    let mut groups = Vec::new();
1312    for chat in chat_list {
1313        if !thread_matches_query(chat, &needle) {
1314            continue;
1315        }
1316        match chat.kind {
1317            ChatKind::Direct => contacts.push(chat.clone()),
1318            ChatKind::Group => groups.push(chat.clone()),
1319        }
1320    }
1321    (contacts, groups)
1322}
1323
1324fn thread_matches_query(chat: &ChatThreadSnapshot, needle_lower: &str) -> bool {
1325    let candidates: [&str; 7] = [
1326        &chat.display_name,
1327        chat.nickname.as_deref().unwrap_or(""),
1328        chat.profile_name.as_deref().unwrap_or(""),
1329        chat.about.as_deref().unwrap_or(""),
1330        chat.subtitle.as_deref().unwrap_or(""),
1331        &chat.draft,
1332        &chat.chat_id,
1333    ];
1334    candidates
1335        .iter()
1336        .any(|field| field.to_lowercase().contains(needle_lower))
1337}
1338
1339fn enrich_message_hits(
1340    hits: Vec<crate::core::PersistedMessageSearchHit>,
1341    chat_list: &[ChatThreadSnapshot],
1342) -> Vec<MessageSearchHit> {
1343    use std::collections::HashMap;
1344    let lookup: HashMap<&str, &ChatThreadSnapshot> = chat_list
1345        .iter()
1346        .map(|chat| (chat.chat_id.as_str(), chat))
1347        .collect();
1348    hits.into_iter()
1349        .map(|hit| {
1350            let parent = lookup.get(hit.chat_id.as_str());
1351            let display_name = parent
1352                .map(|chat| chat.display_name.clone())
1353                .unwrap_or_else(|| short_chat_label(&hit.chat_id));
1354            let picture_url = parent.and_then(|chat| chat.picture_url.clone());
1355            let kind = parent
1356                .map(|chat| chat.kind.clone())
1357                .unwrap_or(ChatKind::Direct);
1358            MessageSearchHit {
1359                chat_id: hit.chat_id,
1360                message_id: hit.message_id,
1361                chat_display_name: display_name,
1362                chat_picture_url: picture_url,
1363                chat_kind: kind,
1364                author_pubkey: hit.author,
1365                body: hit.body,
1366                is_outgoing: hit.is_outgoing,
1367                created_at_secs: hit.created_at_secs,
1368            }
1369        })
1370        .collect()
1371}
1372
1373fn short_chat_label(chat_id: &str) -> String {
1374    let trimmed = chat_id.trim();
1375    if trimmed.len() > 12 {
1376        format!("{}…", &trimmed[..12])
1377    } else {
1378        trimmed.to_string()
1379    }
1380}
1381
1382fn verify_nearby_presence_event_json(
1383    event_json: &str,
1384    peer_id: &str,
1385    my_nonce: &str,
1386    their_nonce: &str,
1387) -> String {
1388    let Ok(event) = serde_json::from_str::<nostr_sdk::prelude::Event>(event_json) else {
1389        return String::new();
1390    };
1391    if event.verify().is_err() || event.kind.as_u16() != crate::core::NEARBY_PRESENCE_KIND {
1392        return String::new();
1393    }
1394    let Ok(content) = serde_json::from_str::<serde_json::Value>(&event.content) else {
1395        return String::new();
1396    };
1397    let get = |key: &str| {
1398        content
1399            .get(key)
1400            .and_then(|value| value.as_str())
1401            .unwrap_or("")
1402    };
1403    let transport = get("transport");
1404    if get("protocol") != "iris-nearby-v1"
1405        || !(transport == "ble" || transport == "nearby" || transport == "lan")
1406        || get("peer_id") != peer_id.trim()
1407        || get("my_nonce") != their_nonce.trim()
1408        || get("their_nonce") != my_nonce.trim()
1409    {
1410        return String::new();
1411    }
1412
1413    let now = SystemTime::now()
1414        .duration_since(UNIX_EPOCH)
1415        .unwrap_or_default()
1416        .as_secs();
1417    let expires_at = content
1418        .get("expires_at")
1419        .and_then(|value| value.as_u64())
1420        .unwrap_or(0);
1421    let created_at = event.created_at.as_secs();
1422    if expires_at < now
1423        || expires_at > now.saturating_add(300)
1424        || created_at.saturating_add(300) < now
1425        || created_at > now.saturating_add(300)
1426    {
1427        return String::new();
1428    }
1429
1430    let profile_event_id = get("profile_event_id");
1431    let profile_event_id = if profile_event_id.len() == 64 {
1432        profile_event_id
1433    } else {
1434        ""
1435    };
1436    serde_json::json!({
1437        "owner_pubkey_hex": event.pubkey.to_hex(),
1438        "profile_event_id": profile_event_id,
1439    })
1440    .to_string()
1441}
1442
1443impl Drop for FfiApp {
1444    fn drop(&mut self) {
1445        let _ = self.foreground_tx.send(CoreMsg::Shutdown(None));
1446    }
1447}
1448
1449#[uniffi::export]
1450pub fn normalize_peer_input(input: String) -> String {
1451    ffi_or("normalize_peer_input", String::new(), || {
1452        crate::core::normalize_peer_input_for_display(&input)
1453    })
1454}
1455
1456#[uniffi::export]
1457pub fn is_valid_peer_input(input: String) -> bool {
1458    ffi_or("is_valid_peer_input", false, || {
1459        crate::core::parse_peer_input(&input).is_ok()
1460    })
1461}
1462
1463/// Single source of truth for "is this typed text an npub or an
1464/// invite URL?". Used by the New Chat paste field, the chat-list
1465/// search bar, and the deep-link handler so all three branches agree
1466/// on what counts as a chat-opening shortcut. Returns `None` for
1467/// regular search-style text.
1468#[uniffi::export]
1469pub fn classify_chat_input(input: String) -> Option<ChatInputShortcut> {
1470    ffi_or("classify_chat_input", None, || chat_input_shortcut(&input))
1471}
1472
1473fn chat_input_shortcut(raw: &str) -> Option<ChatInputShortcut> {
1474    let trimmed = raw.trim();
1475    if trimmed.is_empty() {
1476        return None;
1477    }
1478    let lower = trimmed.to_lowercase();
1479    if lower.contains("://") && lower.contains('#') {
1480        return Some(ChatInputShortcut::Invite {
1481            invite_input: trimmed.to_string(),
1482            display: short_invite_display(trimmed),
1483        });
1484    }
1485    if crate::core::parse_peer_input(trimmed).is_ok() {
1486        use nostr::nips::nip19::ToBech32;
1487        let normalized = crate::core::normalize_peer_input_for_display(trimmed);
1488        if let Ok(pubkey) = nostr::PublicKey::parse(&normalized) {
1489            let npub = pubkey.to_bech32().unwrap_or_else(|_| normalized.clone());
1490            let display = short_npub_display(&npub);
1491            return Some(ChatInputShortcut::DirectPeer {
1492                peer_input: normalized,
1493                display,
1494                npub,
1495                pubkey_hex: pubkey.to_hex(),
1496            });
1497        }
1498    }
1499    None
1500}
1501
1502fn short_npub_display(npub: &str) -> String {
1503    if npub.len() > 16 {
1504        format!("{}…{}", &npub[..10], &npub[npub.len() - 4..])
1505    } else {
1506        npub.to_string()
1507    }
1508}
1509
1510fn short_invite_display(invite: &str) -> String {
1511    // Strip the scheme + host so the row reads "iris.to/invite/…" not
1512    // a 120-char URL. We don't try to parse the bech32 payload; the
1513    // visible host is enough context for the user.
1514    let after_scheme = invite
1515        .split_once("://")
1516        .map(|(_, rest)| rest)
1517        .unwrap_or(invite);
1518    if after_scheme.len() > 32 {
1519        format!("{}…", &after_scheme[..32])
1520    } else {
1521        after_scheme.to_string()
1522    }
1523}
1524
1525/// Convert any pubkey-shaped input (hex, npub, nprofile, …) to its
1526/// canonical lowercase-hex form. The empty string is returned when the
1527/// input can't be parsed as a public key — callers expecting hex
1528/// downstream can short-circuit on that.
1529#[uniffi::export]
1530pub fn peer_input_to_hex(input: String) -> String {
1531    ffi_or("peer_input_to_hex", String::new(), || {
1532        let normalized = crate::core::normalize_peer_input_for_display(&input);
1533        match nostr::PublicKey::parse(&normalized) {
1534            Ok(pubkey) => pubkey.to_hex(),
1535            Err(_) => String::new(),
1536        }
1537    })
1538}
1539
1540/// Convert any pubkey-shaped input (hex, npub, nprofile, …) to its npub form.
1541/// Returns the original string when it can't be parsed as a public key.
1542#[uniffi::export]
1543pub fn peer_input_to_npub(input: String) -> String {
1544    ffi_or("peer_input_to_npub", String::new(), || {
1545        use nostr::nips::nip19::ToBech32;
1546        let normalized = crate::core::normalize_peer_input_for_display(&input);
1547        match nostr::PublicKey::parse(&normalized) {
1548            Ok(pubkey) => pubkey.to_bech32().unwrap_or(normalized),
1549            Err(_) => normalized,
1550        }
1551    })
1552}
1553
1554#[uniffi::export]
1555pub fn build_summary() -> String {
1556    ffi_or("build_summary", String::new(), crate::core::build_summary)
1557}
1558
1559#[uniffi::export]
1560pub fn relay_set_id() -> String {
1561    ffi_or("relay_set_id", String::new(), || {
1562        crate::core::relay_set_id().to_string()
1563    })
1564}
1565
1566#[uniffi::export]
1567pub fn proxied_image_url(
1568    original_src: String,
1569    preferences: PreferencesSnapshot,
1570    width: Option<u32>,
1571    height: Option<u32>,
1572    square: bool,
1573) -> String {
1574    ffi_or("proxied_image_url", original_src.clone(), || {
1575        image_proxy::proxied_image_url(&original_src, &preferences, width, height, square)
1576    })
1577}
1578
1579#[uniffi::export]
1580pub fn is_trusted_test_build() -> bool {
1581    ffi_or(
1582        "is_trusted_test_build",
1583        false,
1584        crate::core::trusted_test_build_flag,
1585    )
1586}
1587
1588/// Marketing version baked in at build time from `IRIS_APP_VERSION_NAME`
1589/// (or `IRIS_APP_VERSION`), falling back to the crate semver. Use this
1590/// instead of `env!("CARGO_PKG_VERSION")` so UI/release artifacts agree
1591/// on a single version string.
1592#[uniffi::export]
1593pub fn app_version() -> String {
1594    crate::core::app_version_string().to_string()
1595}
1596
1597#[uniffi::export]
1598pub fn resolve_mobile_push_notification_payload(
1599    raw_payload_json: String,
1600) -> MobilePushNotificationResolution {
1601    ffi_or(
1602        "resolve_mobile_push_notification_payload",
1603        suppressed_mobile_push_resolution(),
1604        || crate::core::resolve_mobile_push_notification(raw_payload_json),
1605    )
1606}
1607
1608/// Decrypt a notification payload against the persisted double-ratchet
1609/// state under `data_dir`. Use from the FCM service (Android) or
1610/// Notification Service Extension (iOS) where there's no live `FfiApp`.
1611/// Falls back to the generic resolver when keys, payload, or storage
1612/// are unavailable so the user still gets *some* notification.
1613#[uniffi::export]
1614pub fn decrypt_mobile_push_notification_payload(
1615    data_dir: String,
1616    owner_pubkey_hex: String,
1617    device_nsec: String,
1618    raw_payload_json: String,
1619) -> MobilePushNotificationResolution {
1620    ffi_or(
1621        "decrypt_mobile_push_notification_payload",
1622        suppressed_mobile_push_resolution(),
1623        || {
1624            crate::core::decrypt_mobile_push_notification(
1625                data_dir,
1626                owner_pubkey_hex,
1627                device_nsec,
1628                raw_payload_json,
1629            )
1630        },
1631    )
1632}
1633
1634pub use crate::core::notifications::NotificationCandidate;
1635
1636/// Compute the list of chats that should raise a notification given two
1637/// successive `AppState` chat-list snapshots. Single source of truth for
1638/// suppression — chat muted, chat open with app foregrounded, outgoing
1639/// last message, no unread increase, or global pref off. Use this from
1640/// Android, macOS, Linux, and Windows. iOS APNS uses a separate path
1641/// that cannot suppress until Apple grants the filtering entitlement.
1642#[uniffi::export]
1643pub fn decide_pending_notifications(
1644    previous_chats: Vec<ChatThreadSnapshot>,
1645    next_chats: Vec<ChatThreadSnapshot>,
1646    preferences: PreferencesSnapshot,
1647    app_foreground: bool,
1648    open_chat_id: Option<String>,
1649) -> Vec<NotificationCandidate> {
1650    crate::core::notifications::decide_notifications(
1651        &previous_chats,
1652        &next_chats,
1653        &preferences,
1654        app_foreground,
1655        open_chat_id.as_deref(),
1656    )
1657}
1658
1659/// Pull the open chat id out of a `Router`, falling back to its default
1660/// screen. Shells normally just call this with `state.router` so they
1661/// don't each reimplement the `Screen::Chat { chat_id }` extraction.
1662#[uniffi::export]
1663pub fn router_open_chat_id(router: Router) -> Option<String> {
1664    crate::core::notifications::active_chat_id(&router)
1665}
1666
1667#[uniffi::export]
1668pub fn resolve_mobile_push_subscription_server_url(
1669    platform_key: String,
1670    is_release: bool,
1671    override_url: Option<String>,
1672) -> String {
1673    ffi_or(
1674        "resolve_mobile_push_subscription_server_url",
1675        String::new(),
1676        || crate::core::resolve_mobile_push_server_url(platform_key, is_release, override_url),
1677    )
1678}
1679
1680#[uniffi::export]
1681pub fn mobile_push_subscription_id_key(platform_key: String) -> String {
1682    ffi_or("mobile_push_subscription_id_key", String::new(), || {
1683        crate::core::mobile_push_stored_subscription_id_key(platform_key)
1684    })
1685}
1686
1687#[uniffi::export]
1688pub fn build_mobile_push_list_subscriptions_request(
1689    owner_nsec: String,
1690    platform_key: String,
1691    is_release: bool,
1692    server_url_override: Option<String>,
1693) -> Option<MobilePushSubscriptionRequest> {
1694    ffi_or("build_mobile_push_list_subscriptions_request", None, || {
1695        crate::core::build_mobile_push_list_subscriptions_request(
1696            owner_nsec,
1697            platform_key,
1698            is_release,
1699            server_url_override,
1700        )
1701    })
1702}
1703
1704#[uniffi::export]
1705#[allow(clippy::too_many_arguments)]
1706pub fn build_mobile_push_create_subscription_request(
1707    owner_nsec: String,
1708    platform_key: String,
1709    push_token: String,
1710    apns_topic: Option<String>,
1711    message_author_pubkeys: Vec<String>,
1712    invite_response_pubkeys: Vec<String>,
1713    is_release: bool,
1714    server_url_override: Option<String>,
1715) -> Option<MobilePushSubscriptionRequest> {
1716    ffi_or(
1717        "build_mobile_push_create_subscription_request",
1718        None,
1719        || {
1720            crate::core::build_mobile_push_create_subscription_request(
1721                owner_nsec,
1722                platform_key,
1723                push_token,
1724                apns_topic,
1725                message_author_pubkeys,
1726                invite_response_pubkeys,
1727                is_release,
1728                server_url_override,
1729            )
1730        },
1731    )
1732}
1733
1734#[uniffi::export]
1735#[allow(clippy::too_many_arguments)]
1736pub fn build_mobile_push_update_subscription_request(
1737    owner_nsec: String,
1738    subscription_id: String,
1739    platform_key: String,
1740    push_token: String,
1741    apns_topic: Option<String>,
1742    message_author_pubkeys: Vec<String>,
1743    invite_response_pubkeys: Vec<String>,
1744    is_release: bool,
1745    server_url_override: Option<String>,
1746) -> Option<MobilePushSubscriptionRequest> {
1747    ffi_or(
1748        "build_mobile_push_update_subscription_request",
1749        None,
1750        || {
1751            crate::core::build_mobile_push_update_subscription_request(
1752                owner_nsec,
1753                subscription_id,
1754                platform_key,
1755                push_token,
1756                apns_topic,
1757                message_author_pubkeys,
1758                invite_response_pubkeys,
1759                is_release,
1760                server_url_override,
1761            )
1762        },
1763    )
1764}
1765
1766#[uniffi::export]
1767pub fn build_mobile_push_delete_subscription_request(
1768    owner_nsec: String,
1769    subscription_id: String,
1770    platform_key: String,
1771    is_release: bool,
1772    server_url_override: Option<String>,
1773) -> Option<MobilePushSubscriptionRequest> {
1774    ffi_or(
1775        "build_mobile_push_delete_subscription_request",
1776        None,
1777        || {
1778            crate::core::build_mobile_push_delete_subscription_request(
1779                owner_nsec,
1780                subscription_id,
1781                platform_key,
1782                is_release,
1783                server_url_override,
1784            )
1785        },
1786    )
1787}
1788
1789#[cfg(test)]
1790mod ffi_hardening_tests {
1791    use super::*;
1792
1793    #[test]
1794    fn ffi_guard_returns_fallback_after_panic() {
1795        let value = ffi_or("test.panic", 42, || -> i32 {
1796            panic!("ffi boom");
1797        });
1798
1799        assert_eq!(value, 42);
1800    }
1801
1802    #[test]
1803    fn core_batch_guard_converts_panic_to_error() {
1804        let result = catch_core_batch(|| -> bool {
1805            panic!("batch boom");
1806        });
1807
1808        assert_eq!(result, Err("batch boom".to_string()));
1809    }
1810
1811    #[test]
1812    fn core_batch_guard_preserves_success_result() {
1813        assert_eq!(catch_core_batch(|| true), Ok(true));
1814        assert_eq!(catch_core_batch(|| false), Ok(false));
1815    }
1816
1817    #[test]
1818    fn recovery_state_tracks_restore_action_and_logout() {
1819        let recovery = CoreRecoveryState::default();
1820        recovery.remember_action(&AppAction::RestoreSession {
1821            owner_nsec: "secret".to_string(),
1822        });
1823
1824        match recovery.restore_action() {
1825            Some(AppAction::RestoreSession { owner_nsec }) => assert_eq!(owner_nsec, "secret"),
1826            other => panic!("unexpected restore action: {other:?}"),
1827        }
1828
1829        recovery.remember_action(&AppAction::Logout);
1830        assert!(recovery.restore_action().is_none());
1831    }
1832
1833    #[test]
1834    fn recovery_state_tracks_persisted_account_bundle() {
1835        let recovery = CoreRecoveryState::default();
1836        recovery.remember_update(&AppUpdate::PersistAccountBundle {
1837            rev: 7,
1838            owner_nsec: None,
1839            owner_pubkey_hex: "owner".to_string(),
1840            device_nsec: "device-secret".to_string(),
1841        });
1842
1843        match recovery.restore_action() {
1844            Some(AppAction::RestoreAccountBundle {
1845                owner_nsec,
1846                owner_pubkey_hex,
1847                device_nsec,
1848            }) => {
1849                assert_eq!(owner_nsec, None);
1850                assert_eq!(owner_pubkey_hex, "owner");
1851                assert_eq!(device_nsec, "device-secret");
1852            }
1853            other => panic!("unexpected restore action: {other:?}"),
1854        }
1855    }
1856
1857    #[test]
1858    fn nearby_published_events_wait_behind_latest_state_in_drained_batch() {
1859        let mut latest_full_state = None;
1860        let mut before_full_state = Vec::new();
1861        let mut after_full_state = Vec::new();
1862
1863        enqueue_update_for_delivery(
1864            AppUpdate::NearbyPublishedEvent {
1865                event_id: "a".repeat(64),
1866                kind: 14,
1867                created_at_secs: 1,
1868                event_json: "{}".to_string(),
1869            },
1870            &mut latest_full_state,
1871            &mut before_full_state,
1872            &mut after_full_state,
1873        );
1874        let mut stale = AppState::empty();
1875        stale.rev = 1;
1876        enqueue_update_for_delivery(
1877            AppUpdate::FullState(stale),
1878            &mut latest_full_state,
1879            &mut before_full_state,
1880            &mut after_full_state,
1881        );
1882        enqueue_update_for_delivery(
1883            AppUpdate::PersistAccountBundle {
1884                rev: 2,
1885                owner_nsec: None,
1886                owner_pubkey_hex: "owner".to_string(),
1887                device_nsec: "device".to_string(),
1888            },
1889            &mut latest_full_state,
1890            &mut before_full_state,
1891            &mut after_full_state,
1892        );
1893        let mut latest = AppState::empty();
1894        latest.rev = 3;
1895        enqueue_update_for_delivery(
1896            AppUpdate::FullState(latest),
1897            &mut latest_full_state,
1898            &mut before_full_state,
1899            &mut after_full_state,
1900        );
1901
1902        let order = before_full_state
1903            .into_iter()
1904            .chain(latest_full_state)
1905            .chain(after_full_state)
1906            .map(|update| match update {
1907                AppUpdate::PersistAccountBundle { .. } => "persist".to_string(),
1908                AppUpdate::FullState(state) => format!("state:{}", state.rev),
1909                AppUpdate::NearbyPublishedEvent { .. } => "nearby".to_string(),
1910            })
1911            .collect::<Vec<_>>();
1912
1913        assert_eq!(order, vec!["persist", "state:3", "nearby"]);
1914    }
1915
1916    #[test]
1917    fn core_supervisor_recovers_after_batch_panic() {
1918        let temp_dir = tempfile::TempDir::new().expect("temp dir");
1919        let app = new_ffi_app_inner(temp_dir.path().to_string_lossy().to_string());
1920
1921        app.foreground_tx
1922            .send(CoreMsg::PanicForTest)
1923            .expect("send test panic");
1924
1925        for _ in 0..40 {
1926            if app.recovery.restart_count() > 0 {
1927                break;
1928            }
1929            thread::sleep(Duration::from_millis(25));
1930        }
1931        assert_eq!(app.recovery.restart_count(), 1);
1932
1933        let (reply_tx, reply_rx) = flume::bounded(1);
1934        app.foreground_tx
1935            .send(CoreMsg::CorePerfCounters(reply_tx))
1936            .expect("send post-recovery request");
1937        assert!(reply_rx.recv_timeout(Duration::from_secs(2)).is_ok());
1938
1939        app.shutdown();
1940    }
1941}