Skip to main content

iris_chat_core/
lib.rs

1mod actions;
2mod core;
3pub mod desktop_nearby;
4pub mod image_proxy;
5pub mod local_relay;
6pub mod perflog;
7mod qr;
8mod state;
9mod test_fixtures;
10pub mod update_policy;
11mod updates;
12
13use std::any::Any;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, RwLock};
16use std::thread;
17use std::time::{Duration, SystemTime, UNIX_EPOCH};
18use std::{panic, panic::AssertUnwindSafe};
19
20use flume::{Receiver, Sender};
21
22pub use actions::AppAction;
23pub use qr::*;
24pub use state::*;
25pub use test_fixtures::*;
26pub use update_policy::UpdateAutoCheckPolicy;
27pub use updates::*;
28
29use crate::core::AppCore;
30
31uniffi::setup_scaffolding!();
32
33pub(crate) const CORE_RESTART_TOAST: &str = "Iris needs restart. Copy support bundle in Settings.";
34
35fn enqueue_update_for_delivery(
36    update: AppUpdate,
37    latest_full_state: &mut Option<AppUpdate>,
38    before_full_state: &mut Vec<AppUpdate>,
39    after_full_state: &mut Vec<AppUpdate>,
40) {
41    match update {
42        full @ AppUpdate::FullState(_) => *latest_full_state = Some(full),
43        nearby @ AppUpdate::NearbyPublishedEvent { .. } => after_full_state.push(nearby),
44        other => before_full_state.push(other),
45    }
46}
47
48#[uniffi::export(callback_interface)]
49pub trait AppReconciler: Send + Sync + 'static {
50    fn reconcile(&self, update: AppUpdate);
51}
52
53#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
54pub struct DesktopNearbyPeerSnapshot {
55    pub id: String,
56    pub name: String,
57    pub owner_pubkey_hex: Option<String>,
58    pub picture_url: Option<String>,
59    pub profile_event_id: Option<String>,
60    pub last_seen_secs: u64,
61}
62
63#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
64pub struct DesktopNearbySnapshot {
65    pub visible: bool,
66    pub status: String,
67    pub peers: Vec<DesktopNearbyPeerSnapshot>,
68}
69
70#[uniffi::export(callback_interface)]
71pub trait DesktopNearbyObserver: Send + Sync + 'static {
72    fn desktop_nearby_changed(&self, snapshot: DesktopNearbySnapshot);
73}
74
75/// Per-FFI-method call counters that feed the release-gate budget
76/// tests. Every `FfiApp::*` entry point bumps the matching atomic at
77/// the top of the call so a misbehaving shell that re-enters the
78/// core in a hot loop shows up as an obvious counter spike.
79///
80/// We track FFI surface area (not internal core counters) because
81/// the categorical heat bugs we hit have all been "the shell
82/// re-evaluated something and called us N times instead of once" —
83/// e.g. the iOS chat-list search re-firing on every body re-eval.
84/// Adding finer-grained counters inside the core is a future move
85/// if needed; the FFI line is what we can confidently budget today
86/// because each entry corresponds to one observable shell action.
87#[derive(Default, Debug)]
88pub(crate) struct FfiPerfCounters {
89    pub state: AtomicU64,
90    pub dispatch: AtomicU64,
91    pub search: AtomicU64,
92    pub ingest_nearby_event_json: AtomicU64,
93    pub export_support_bundle_json: AtomicU64,
94    pub peer_profile_debug: AtomicU64,
95    pub mutual_groups: AtomicU64,
96    pub prepare_for_suspend: AtomicU64,
97}
98
99#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
100pub struct FfiPerfCountersSnapshot {
101    pub state: u64,
102    pub dispatch: u64,
103    pub search: u64,
104    pub ingest_nearby_event_json: u64,
105    pub export_support_bundle_json: u64,
106    pub peer_profile_debug: u64,
107    pub mutual_groups: u64,
108    pub prepare_for_suspend: u64,
109}
110
111/// Core-internal hot-loop counters. FFI surface counters can only
112/// catch shells that re-enter the core in a loop — the
113/// build_runtime_debug_snapshot regression that caused the macOS CPU
114/// loop was entirely internal (relay events fanned out through
115/// `persist_best_effort_inner` → `persist_debug_snapshot_best_effort`
116/// → full SessionManager clone × N known users). The release-gate
117/// budget tests assert on this snapshot too so the next time core
118/// work explodes per event, CI catches it before a device does.
119#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
120pub struct CorePerfCountersSnapshot {
121    pub debug_snapshot_builds: u64,
122}
123
124#[derive(uniffi::Object)]
125pub struct FfiApp {
126    foreground_tx: Sender<CoreMsg>,
127    foreground_rx: Receiver<CoreMsg>,
128    background_tx: Sender<CoreMsg>,
129    background_rx: Receiver<CoreMsg>,
130    update_rx: Receiver<AppUpdate>,
131    listening: AtomicBool,
132    shared_state: Arc<RwLock<AppState>>,
133    /// Shared SQLite handle used by direct read FFI calls. The core
134    /// supervisor swaps this when it recreates `AppCore` after a panic.
135    shared_db: Arc<RwLock<Option<crate::core::SharedConnection>>>,
136    perf: FfiPerfCounters,
137    queue_metrics: Arc<CoreQueueMetrics>,
138    recovery: Arc<CoreRecoveryState>,
139}
140
141#[derive(Default, Debug)]
142struct CoreQueueMetrics {
143    foreground_processed: AtomicU64,
144    background_processed: AtomicU64,
145    batch_active: AtomicBool,
146    last_batch_started_at_ms: AtomicU64,
147    last_batch_finished_at_ms: AtomicU64,
148    last_batch_size: AtomicU64,
149    last_batch_foreground_count: AtomicU64,
150    last_batch_background_count: AtomicU64,
151}
152
153impl CoreQueueMetrics {
154    fn mark_batch_start(&self, size: u64, foreground: u64, background: u64) {
155        self.last_batch_started_at_ms
156            .store(crate::perflog::now_ms(), Ordering::Relaxed);
157        self.last_batch_size.store(size, Ordering::Relaxed);
158        self.last_batch_foreground_count
159            .store(foreground, Ordering::Relaxed);
160        self.last_batch_background_count
161            .store(background, Ordering::Relaxed);
162        self.batch_active.store(true, Ordering::Release);
163    }
164
165    fn mark_batch_finished(&self, foreground: u64, background: u64) {
166        self.foreground_processed
167            .fetch_add(foreground, Ordering::Relaxed);
168        self.background_processed
169            .fetch_add(background, Ordering::Relaxed);
170        self.last_batch_finished_at_ms
171            .store(crate::perflog::now_ms(), Ordering::Relaxed);
172        self.batch_active.store(false, Ordering::Release);
173    }
174}
175
176#[derive(Default, Debug)]
177struct CoreRecoveryState {
178    restore_action: RwLock<Option<AppAction>>,
179    restart_count: AtomicU64,
180    last_panic: RwLock<Option<String>>,
181}
182
183impl CoreRecoveryState {
184    fn remember_action(&self, action: &AppAction) {
185        match action {
186            AppAction::RestoreSession { .. } | AppAction::RestoreAccountBundle { .. } => {
187                self.set_restore_action(Some(action.clone()));
188            }
189            AppAction::Logout => self.set_restore_action(None),
190            _ => {}
191        }
192    }
193
194    fn remember_update(&self, update: &AppUpdate) {
195        if let AppUpdate::PersistAccountBundle {
196            owner_nsec,
197            owner_pubkey_hex,
198            device_nsec,
199            ..
200        } = update
201        {
202            self.set_restore_action(Some(AppAction::RestoreAccountBundle {
203                owner_nsec: owner_nsec.clone(),
204                owner_pubkey_hex: owner_pubkey_hex.clone(),
205                device_nsec: device_nsec.clone(),
206            }));
207        }
208    }
209
210    fn restore_action(&self) -> Option<AppAction> {
211        match self.restore_action.read() {
212            Ok(action) => action.clone(),
213            Err(poison) => poison.into_inner().clone(),
214        }
215    }
216
217    fn mark_panic(&self, detail: String) -> u64 {
218        match self.last_panic.write() {
219            Ok(mut slot) => *slot = Some(detail),
220            Err(poison) => *poison.into_inner() = Some(detail),
221        }
222        self.restart_count.fetch_add(1, Ordering::Relaxed) + 1
223    }
224
225    fn restart_count(&self) -> u64 {
226        self.restart_count.load(Ordering::Relaxed)
227    }
228
229    fn last_panic(&self) -> Option<String> {
230        match self.last_panic.read() {
231            Ok(slot) => slot.clone(),
232            Err(poison) => poison.into_inner().clone(),
233        }
234    }
235
236    fn set_restore_action(&self, action: Option<AppAction>) {
237        match self.restore_action.write() {
238            Ok(mut slot) => *slot = action,
239            Err(poison) => *poison.into_inner() = action,
240        }
241    }
242}
243
244#[derive(uniffi::Object)]
245pub struct FfiDesktopNearby {
246    service: Arc<desktop_nearby::DesktopNearbyService>,
247}
248
249#[uniffi::export]
250impl FfiApp {
251    #[uniffi::constructor]
252    pub fn new(data_dir: String, _keychain_group: String, _app_version: String) -> Arc<Self> {
253        match panic::catch_unwind(AssertUnwindSafe(|| new_ffi_app_inner(data_dir))) {
254            Ok(app) => app,
255            Err(payload) => ffi_app_failure(format!(
256                "Iris could not start: {}",
257                panic_payload_to_string(payload)
258            )),
259        }
260    }
261
262    pub fn state(&self) -> AppState {
263        self.perf.state.fetch_add(1, Ordering::Relaxed);
264        ffi_or("ffiapp.state", ffi_failure_state(), || {
265            match self.shared_state.read() {
266                Ok(slot) => slot.clone(),
267                Err(poison) => poison.into_inner().clone(),
268            }
269        })
270    }
271
272    pub fn dispatch(&self, action: AppAction) {
273        self.perf.dispatch.fetch_add(1, Ordering::Relaxed);
274        ffi_or("ffiapp.dispatch", (), || {
275            crate::perflog!("ffi.dispatch action={:?}", std::mem::discriminant(&action));
276            self.recovery.remember_action(&action);
277            let _ = self.foreground_tx.send(CoreMsg::Action(action));
278        })
279    }
280
281    /// Snapshot of the per-FFI-method call counts since `FfiApp` was
282    /// created. Used by `tests/perf_budgets.rs` to assert that hot
283    /// shell paths stay within their expected FFI traffic. Reading
284    /// is best-effort `Relaxed` — call counts are advisory not
285    /// transactional.
286    pub fn perf_counters(&self) -> FfiPerfCountersSnapshot {
287        FfiPerfCountersSnapshot {
288            state: self.perf.state.load(Ordering::Relaxed),
289            dispatch: self.perf.dispatch.load(Ordering::Relaxed),
290            search: self.perf.search.load(Ordering::Relaxed),
291            ingest_nearby_event_json: self.perf.ingest_nearby_event_json.load(Ordering::Relaxed),
292            export_support_bundle_json: self
293                .perf
294                .export_support_bundle_json
295                .load(Ordering::Relaxed),
296            peer_profile_debug: self.perf.peer_profile_debug.load(Ordering::Relaxed),
297            mutual_groups: self.perf.mutual_groups.load(Ordering::Relaxed),
298            prepare_for_suspend: self.perf.prepare_for_suspend.load(Ordering::Relaxed),
299        }
300    }
301
302    /// Snapshot of core-internal hot-loop counters. Used by
303    /// `tests/perf_budgets.rs` to budget work that happens entirely
304    /// inside the core thread — the FFI surface counters above can't
305    /// see those. Default snapshot on timeout so a wedged core can't
306    /// pin the test on a perpetual wait.
307    pub fn core_perf_counters(&self) -> CorePerfCountersSnapshot {
308        ffi_or(
309            "ffiapp.core_perf_counters",
310            CorePerfCountersSnapshot::default(),
311            || {
312                let (reply_tx, reply_rx) = flume::bounded(1);
313                if self
314                    .foreground_tx
315                    .send(CoreMsg::CorePerfCounters(reply_tx))
316                    .is_err()
317                {
318                    return CorePerfCountersSnapshot::default();
319                }
320                match reply_rx.recv_timeout(Duration::from_secs(2)) {
321                    Ok(snapshot) => CorePerfCountersSnapshot {
322                        debug_snapshot_builds: snapshot.debug_snapshot_builds,
323                    },
324                    Err(_) => CorePerfCountersSnapshot::default(),
325                }
326            },
327        )
328    }
329
330    /// Grouped Signal-style search: filters the in-memory chat list
331    /// into contacts/groups by display name + subtitle + chat id, and
332    /// runs the SQLite FTS5 index for the messages section. Optional
333    /// `scope_chat_id` restricts message hits to a single thread (the
334    /// "search in this chat" pill in the desktop sidebar). Returns an
335    /// empty snapshot for empty / whitespace queries.
336    pub fn search(
337        &self,
338        query: String,
339        scope_chat_id: Option<String>,
340        limit: u32,
341    ) -> SearchResultSnapshot {
342        self.perf.search.fetch_add(1, Ordering::Relaxed);
343        ffi_or(
344            "ffiapp.search",
345            SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone()),
346            || {
347                let trimmed = query.trim();
348                if trimmed.is_empty() {
349                    return SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone());
350                }
351                let limit = limit.max(1) as usize;
352                let state_snapshot = match self.shared_state.read() {
353                    Ok(slot) => slot.clone(),
354                    Err(poison) => poison.into_inner().clone(),
355                };
356                let (contacts, groups) = if scope_chat_id.is_some() {
357                    (Vec::new(), Vec::new())
358                } else {
359                    filter_threads_for_search(&state_snapshot.chat_list, trimmed)
360                };
361                let shared_db = self.shared_db_snapshot();
362                let messages = match shared_db.as_ref() {
363                    Some(shared) => match shared.lock() {
364                        Ok(conn) => crate::core::search_messages_fts(
365                            &conn,
366                            trimmed,
367                            scope_chat_id.as_deref(),
368                            limit,
369                        )
370                        .unwrap_or_default(),
371                        Err(poison) => crate::core::search_messages_fts(
372                            &poison.into_inner(),
373                            trimmed,
374                            scope_chat_id.as_deref(),
375                            limit,
376                        )
377                        .unwrap_or_default(),
378                    },
379                    None => Vec::new(),
380                };
381                let enriched = enrich_message_hits(messages, &state_snapshot.chat_list);
382                // The shortcut row only makes sense for global search.
383                // Once the user has scoped to a single chat, an npub
384                // paste should still search that chat's messages, not
385                // jump out of the scope.
386                let shortcut = if scope_chat_id.is_none() {
387                    chat_input_shortcut(trimmed)
388                } else {
389                    None
390                };
391                SearchResultSnapshot {
392                    query,
393                    scope_chat_id,
394                    contacts,
395                    groups,
396                    messages: enriched,
397                    shortcut,
398                }
399            },
400        )
401    }
402
403    /// Bounded chat projection for a route-selected chat. Unlike
404    /// `OpenChat`, this is a direct read from the shared state/SQLite
405    /// handle and never waits behind the core action queue. Shells use
406    /// it as the first paint for chat screens; the core still receives
407    /// `OpenChat` for unread clearing, subscriptions, and side effects.
408    pub fn chat_snapshot(&self, chat_id: String, limit: u32) -> Option<CurrentChatSnapshot> {
409        ffi_or("ffiapp.chat_snapshot", None, || {
410            let state_snapshot = match self.shared_state.read() {
411                Ok(slot) => slot.clone(),
412                Err(poison) => poison.into_inner().clone(),
413            };
414            crate::core::chat_snapshot_from_state_and_db(
415                &state_snapshot,
416                self.shared_db_snapshot().as_ref(),
417                &chat_id,
418                limit.max(1) as usize,
419            )
420        })
421    }
422
423    pub fn chat_snapshot_before(
424        &self,
425        chat_id: String,
426        before_message_id: String,
427        limit: u32,
428    ) -> Option<CurrentChatSnapshot> {
429        ffi_or("ffiapp.chat_snapshot_before", None, || {
430            let state_snapshot = match self.shared_state.read() {
431                Ok(slot) => slot.clone(),
432                Err(poison) => poison.into_inner().clone(),
433            };
434            crate::core::chat_snapshot_before_from_state_and_db(
435                &state_snapshot,
436                self.shared_db_snapshot().as_ref(),
437                &chat_id,
438                &before_message_id,
439                limit.max(1) as usize,
440            )
441        })
442    }
443
444    pub fn chat_snapshot_around_message(
445        &self,
446        chat_id: String,
447        message_id: String,
448        before_limit: u32,
449        after_limit: u32,
450    ) -> Option<CurrentChatSnapshot> {
451        ffi_or("ffiapp.chat_snapshot_around_message", None, || {
452            let state_snapshot = match self.shared_state.read() {
453                Ok(slot) => slot.clone(),
454                Err(poison) => poison.into_inner().clone(),
455            };
456            crate::core::chat_snapshot_around_message_from_state_and_db(
457                &state_snapshot,
458                self.shared_db_snapshot().as_ref(),
459                &chat_id,
460                &message_id,
461                before_limit as usize,
462                after_limit as usize,
463            )
464        })
465    }
466
467    pub fn ingest_nearby_event_json(&self, event_json: String) -> bool {
468        self.perf
469            .ingest_nearby_event_json
470            .fetch_add(1, Ordering::Relaxed);
471        self.ingest_nearby_event_json_with_transport(event_json, String::new())
472    }
473
474    pub fn ingest_nearby_event_json_with_transport(
475        &self,
476        event_json: String,
477        transport: String,
478    ) -> bool {
479        ffi_or("ffiapp.ingest_nearby_event_json", false, || {
480            let event = match serde_json::from_str::<nostr_sdk::prelude::Event>(&event_json) {
481                Ok(event) => event,
482                Err(_) => return false,
483            };
484            if event.verify().is_err() {
485                return false;
486            }
487            self.background_tx
488                .send(CoreMsg::Internal(Box::new(InternalEvent::NearbyEvent {
489                    event,
490                    transport,
491                })))
492                .is_ok()
493        })
494    }
495
496    pub fn build_nearby_presence_event_json(
497        &self,
498        peer_id: String,
499        my_nonce: String,
500        their_nonce: String,
501        profile_event_id: String,
502    ) -> String {
503        ffi_or(
504            "ffiapp.build_nearby_presence_event_json",
505            String::new(),
506            || {
507                let (reply_tx, reply_rx) = flume::bounded(1);
508                if self
509                    .background_tx
510                    .send(CoreMsg::BuildNearbyPresenceEvent {
511                        peer_id,
512                        my_nonce,
513                        their_nonce,
514                        profile_event_id,
515                        reply_tx,
516                    })
517                    .is_err()
518                {
519                    return String::new();
520                }
521                reply_rx
522                    .recv_timeout(Duration::from_secs(2))
523                    .unwrap_or_default()
524            },
525        )
526    }
527
528    pub fn verify_nearby_presence_event_json(
529        &self,
530        event_json: String,
531        peer_id: String,
532        my_nonce: String,
533        their_nonce: String,
534    ) -> String {
535        ffi_or(
536            "ffiapp.verify_nearby_presence_event_json",
537            String::new(),
538            || verify_nearby_presence_event_json(&event_json, &peer_id, &my_nonce, &their_nonce),
539        )
540    }
541
542    pub fn nearby_encode_frame(&self, envelope_json: String) -> Vec<u8> {
543        ffi_or("ffiapp.nearby_encode_frame", Vec::new(), || {
544            nostr_double_ratchet_runtime::encode_nearby_frame_json(&envelope_json)
545                .unwrap_or_default()
546        })
547    }
548
549    pub fn nearby_decode_frame(&self, frame: Vec<u8>) -> String {
550        ffi_or("ffiapp.nearby_decode_frame", String::new(), || {
551            nostr_double_ratchet_runtime::decode_nearby_frame_json(&frame).unwrap_or_default()
552        })
553    }
554
555    pub fn nearby_frame_body_len_from_header(&self, header: Vec<u8>) -> i32 {
556        ffi_or("ffiapp.nearby_frame_body_len_from_header", -1, || {
557            nostr_double_ratchet_runtime::nearby_frame_body_len_from_header(&header)
558                .and_then(|len| i32::try_from(len).ok())
559                .unwrap_or(-1)
560        })
561    }
562
563    pub fn export_support_bundle_json(&self) -> String {
564        self.perf
565            .export_support_bundle_json
566            .fetch_add(1, Ordering::Relaxed);
567        ffi_or(
568            "ffiapp.export_support_bundle_json",
569            self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
570            || {
571                let (reply_tx, reply_rx) = flume::bounded(1);
572                if self
573                    .foreground_tx
574                    .send(CoreMsg::ExportSupportBundle(reply_tx))
575                    .is_err()
576                {
577                    return self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true);
578                }
579                match reply_rx.recv_timeout(Duration::from_secs(2)) {
580                    Ok(json) => self.support_bundle_json_with_ffi_diagnostics(json, false),
581                    Err(_) => self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
582                }
583            },
584        )
585    }
586
587    pub fn peer_profile_debug(&self, owner_input: String) -> Option<PeerProfileDebugSnapshot> {
588        self.perf.peer_profile_debug.fetch_add(1, Ordering::Relaxed);
589        ffi_or("ffiapp.peer_profile_debug", None, || {
590            let (reply_tx, reply_rx) = flume::bounded(1);
591            if self
592                .foreground_tx
593                .send(CoreMsg::PeerProfileDebug {
594                    owner_input,
595                    reply_tx,
596                })
597                .is_err()
598            {
599                return None;
600            }
601            reply_rx.recv_timeout(Duration::from_secs(2)).ok().flatten()
602        })
603    }
604
605    pub fn mutual_groups(&self, owner_input: String) -> MutualGroupsSnapshot {
606        self.perf.mutual_groups.fetch_add(1, Ordering::Relaxed);
607        ffi_or(
608            "ffiapp.mutual_groups",
609            MutualGroupsSnapshot::default(),
610            || {
611                let (reply_tx, reply_rx) = flume::bounded(1);
612                if self
613                    .foreground_tx
614                    .send(CoreMsg::MutualGroups {
615                        owner_input,
616                        reply_tx,
617                    })
618                    .is_err()
619                {
620                    return MutualGroupsSnapshot::default();
621                }
622                reply_rx
623                    .recv_timeout(Duration::from_secs(2))
624                    .unwrap_or_default()
625            },
626        )
627    }
628
629    pub fn prepare_for_suspend(&self) {
630        self.perf
631            .prepare_for_suspend
632            .fetch_add(1, Ordering::Relaxed);
633        ffi_or("ffiapp.prepare_for_suspend", (), || {
634            let (reply_tx, reply_rx) = flume::bounded(1);
635            if self
636                .foreground_tx
637                .send(CoreMsg::PrepareForSuspend(reply_tx))
638                .is_err()
639            {
640                return;
641            }
642            let _ = reply_rx.recv_timeout(Duration::from_secs(2));
643        })
644    }
645
646    pub fn shutdown(&self) {
647        ffi_or("ffiapp.shutdown", (), || {
648            let (reply_tx, reply_rx) = flume::bounded(1);
649            if self
650                .foreground_tx
651                .send(CoreMsg::Shutdown(Some(reply_tx)))
652                .is_err()
653            {
654                return;
655            }
656            let _ = reply_rx.recv_timeout(Duration::from_secs(2));
657        })
658    }
659
660    fn support_bundle_json_with_ffi_diagnostics(
661        &self,
662        rust_json: String,
663        core_support_bundle_timed_out: bool,
664    ) -> String {
665        let mut object = serde_json::from_str::<serde_json::Value>(&rust_json)
666            .ok()
667            .and_then(|value| value.as_object().cloned())
668            .unwrap_or_default();
669        let now_ms = crate::perflog::now_ms();
670        let last_started_at_ms = self
671            .queue_metrics
672            .last_batch_started_at_ms
673            .load(Ordering::Relaxed);
674        let last_finished_at_ms = self
675            .queue_metrics
676            .last_batch_finished_at_ms
677            .load(Ordering::Relaxed);
678        let batch_active = self.queue_metrics.batch_active.load(Ordering::Acquire);
679        let active_batch_age_ms = if batch_active && last_started_at_ms > 0 {
680            Some(now_ms.saturating_sub(last_started_at_ms))
681        } else {
682            None
683        };
684        let last_batch_started_ago_ms = if last_started_at_ms > 0 {
685            Some(now_ms.saturating_sub(last_started_at_ms))
686        } else {
687            None
688        };
689        let last_batch_finished_ago_ms = if last_finished_at_ms > 0 {
690            Some(now_ms.saturating_sub(last_finished_at_ms))
691        } else {
692            None
693        };
694        object.insert(
695            "ffi_queue".to_string(),
696            serde_json::json!({
697                "core_support_bundle_timed_out": core_support_bundle_timed_out,
698                "foreground_pending": self.foreground_rx.len(),
699                "background_pending": self.background_rx.len(),
700                "foreground_processed": self.queue_metrics.foreground_processed.load(Ordering::Relaxed),
701                "background_processed": self.queue_metrics.background_processed.load(Ordering::Relaxed),
702                "batch_active": batch_active,
703                "active_batch_age_ms": active_batch_age_ms,
704                "last_batch_started_ago_ms": last_batch_started_ago_ms,
705                "last_batch_finished_ago_ms": last_batch_finished_ago_ms,
706                "last_batch_size": self.queue_metrics.last_batch_size.load(Ordering::Relaxed),
707                "last_batch_foreground_count": self.queue_metrics.last_batch_foreground_count.load(Ordering::Relaxed),
708                "last_batch_background_count": self.queue_metrics.last_batch_background_count.load(Ordering::Relaxed),
709                "core_restarts": self.recovery.restart_count(),
710                "last_core_panic": self.recovery.last_panic(),
711                "has_cached_restore_action": self.recovery.restore_action().is_some(),
712            }),
713        );
714        serde_json::Value::Object(object).to_string()
715    }
716
717    pub fn listen_for_updates(&self, reconciler: Box<dyn AppReconciler>) {
718        if self
719            .listening
720            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
721            .is_err()
722        {
723            return;
724        }
725
726        let update_rx = self.update_rx.clone();
727        let recovery = self.recovery.clone();
728        let spawn_result = thread::Builder::new()
729            .name("iris-updates".to_string())
730            .spawn(move || {
731                // Drain queued updates and deliver the latest FullState only.
732                // The shell side already discards FullStates with stale `rev`,
733                // but the JNI marshal of an AppState is itself ~20-30 ms and
734                // each push triggers a full Compose recomposition (~400 ms on
735                // Android debug). When the core emits a tight burst of 3-4
736                // updates (OpenChat → SyncComplete → FetchCatchUpEvents → …)
737                // the UI keeps re-rendering for seconds even though only the
738                // final state mattered.
739                //
740                // PersistAccountBundle is a side-effect (key persistence), not
741                // a UI update, so we never collapse those — every one must run.
742                while let Ok(first) = update_rx.recv() {
743                    let mut latest_full_state: Option<AppUpdate> = None;
744                    let mut before_full_state: Vec<AppUpdate> = Vec::new();
745                    let mut after_full_state: Vec<AppUpdate> = Vec::new();
746                    let process = |update: AppUpdate,
747                                   latest: &mut Option<AppUpdate>,
748                                   before: &mut Vec<AppUpdate>,
749                                   after: &mut Vec<AppUpdate>| {
750                        recovery.remember_update(&update);
751                        enqueue_update_for_delivery(update, latest, before, after);
752                    };
753                    process(
754                        first,
755                        &mut latest_full_state,
756                        &mut before_full_state,
757                        &mut after_full_state,
758                    );
759                    while let Ok(next) = update_rx.try_recv() {
760                        process(
761                            next,
762                            &mut latest_full_state,
763                            &mut before_full_state,
764                            &mut after_full_state,
765                        );
766                    }
767                    for update in before_full_state
768                        .into_iter()
769                        .chain(latest_full_state)
770                        .chain(after_full_state)
771                    {
772                        let kind = match &update {
773                            AppUpdate::FullState(_) => "FullState",
774                            AppUpdate::PersistAccountBundle { .. } => "PersistAccountBundle",
775                            AppUpdate::NearbyPublishedEvent { .. } => "NearbyPublishedEvent",
776                        };
777                        let t0 = crate::perflog::now_ms();
778                        crate::perflog!("reconcile.start kind={kind}");
779                        if panic::catch_unwind(AssertUnwindSafe(|| reconciler.reconcile(update)))
780                            .is_err()
781                        {
782                            crate::perflog!("reconcile.failed kind={kind}");
783                            continue;
784                        }
785                        crate::perflog!(
786                            "reconcile.end kind={kind} elapsed_ms={}",
787                            crate::perflog::now_ms().saturating_sub(t0)
788                        );
789                    }
790                }
791            });
792        if let Err(error) = spawn_result {
793            crate::perflog!("updates.spawn.failed error={error}");
794            self.listening.store(false, Ordering::SeqCst);
795        }
796    }
797}
798
799impl FfiApp {
800    fn shared_db_snapshot(&self) -> Option<crate::core::SharedConnection> {
801        match self.shared_db.read() {
802            Ok(slot) => slot.clone(),
803            Err(poison) => poison.into_inner().clone(),
804        }
805    }
806}
807
808#[uniffi::export]
809impl FfiDesktopNearby {
810    #[uniffi::constructor]
811    pub fn new(app: Arc<FfiApp>, observer: Box<dyn DesktopNearbyObserver>) -> Arc<Self> {
812        Arc::new(Self {
813            service: desktop_nearby::DesktopNearbyService::new(app, observer.into()),
814        })
815    }
816
817    pub fn start(&self, local_name: String) {
818        self.service.start(local_name);
819    }
820
821    pub fn stop(&self) {
822        self.service.stop();
823    }
824
825    pub fn snapshot(&self) -> DesktopNearbySnapshot {
826        self.service.snapshot()
827    }
828
829    pub fn publish(&self, event_id: String, kind: u32, created_at_secs: u64, event_json: String) {
830        self.service
831            .publish(event_id, kind, created_at_secs, event_json);
832    }
833}
834
835fn new_ffi_app_inner(data_dir: String) -> Arc<FfiApp> {
836    let (update_tx, update_rx) = flume::unbounded();
837    let (foreground_tx, foreground_rx) = flume::unbounded();
838    let (background_tx, background_rx) = flume::unbounded();
839    let shared_state = Arc::new(RwLock::new(AppState::empty()));
840    let queue_metrics = Arc::new(CoreQueueMetrics::default());
841    let recovery = Arc::new(CoreRecoveryState::default());
842    let shared_db = Arc::new(RwLock::new(None));
843
844    let update_tx_for_error = update_tx.clone();
845    match AppCore::try_new(
846        update_tx.clone(),
847        background_tx.clone(),
848        data_dir.clone(),
849        shared_state.clone(),
850    ) {
851        Ok(core) => {
852            set_shared_db(&shared_db, Some(core.shared_db()));
853            let spawn_result = spawn_core_supervisor(
854                core,
855                CoreSupervisor {
856                    data_dir,
857                    update_tx: update_tx.clone(),
858                    core_sender: background_tx.clone(),
859                    foreground_rx: foreground_rx.clone(),
860                    background_rx: background_rx.clone(),
861                    shared_state: shared_state.clone(),
862                    shared_db: shared_db.clone(),
863                    queue_metrics: queue_metrics.clone(),
864                    recovery: recovery.clone(),
865                },
866            );
867            if let Err(error) = spawn_result {
868                publish_core_failure_state(
869                    &shared_state,
870                    &update_tx_for_error,
871                    format!("Iris could not start: {error}"),
872                );
873            }
874        }
875        Err(error) => {
876            publish_core_failure_state(&shared_state, &update_tx_for_error, error.to_string());
877        }
878    }
879
880    Arc::new(FfiApp {
881        foreground_tx,
882        foreground_rx,
883        background_tx,
884        background_rx,
885        update_rx,
886        listening: AtomicBool::new(false),
887        shared_state,
888        shared_db,
889        perf: FfiPerfCounters::default(),
890        queue_metrics,
891        recovery,
892    })
893}
894
895fn ffi_app_failure(message: String) -> Arc<FfiApp> {
896    let (_update_tx, update_rx) = flume::unbounded();
897    let (foreground_tx, foreground_rx) = flume::unbounded();
898    let (background_tx, background_rx) = flume::unbounded();
899    let mut state = AppState::empty();
900    state.toast = Some(message);
901    state.rev = 1;
902    let shared_state = Arc::new(RwLock::new(state));
903    Arc::new(FfiApp {
904        foreground_tx,
905        foreground_rx,
906        background_tx,
907        background_rx,
908        update_rx,
909        listening: AtomicBool::new(false),
910        shared_state,
911        shared_db: Arc::new(RwLock::new(None)),
912        perf: FfiPerfCounters::default(),
913        queue_metrics: Arc::new(CoreQueueMetrics::default()),
914        recovery: Arc::new(CoreRecoveryState::default()),
915    })
916}
917
918struct CoreSupervisor {
919    data_dir: String,
920    update_tx: Sender<AppUpdate>,
921    core_sender: Sender<CoreMsg>,
922    foreground_rx: Receiver<CoreMsg>,
923    background_rx: Receiver<CoreMsg>,
924    shared_state: Arc<RwLock<AppState>>,
925    shared_db: Arc<RwLock<Option<crate::core::SharedConnection>>>,
926    queue_metrics: Arc<CoreQueueMetrics>,
927    recovery: Arc<CoreRecoveryState>,
928}
929
930fn spawn_core_supervisor(
931    core: AppCore,
932    supervisor: CoreSupervisor,
933) -> std::io::Result<thread::JoinHandle<()>> {
934    thread::Builder::new()
935        .name("iris-core".to_string())
936        .spawn(move || {
937            let mut core_slot = Some(core);
938            // User actions and synchronous shell requests must not sit behind
939            // relay/nearby backlog. The core keeps internal work on a
940            // background queue and drains it in bounded chunks between
941            // foreground batches.
942            while let Ok(batch) =
943                recv_core_batch(&supervisor.foreground_rx, &supervisor.background_rx)
944            {
945                let batch_size = batch.len();
946                let foreground_count = batch
947                    .iter()
948                    .filter(|msg| is_foreground_core_msg(msg))
949                    .count() as u64;
950                let background_count = batch_size as u64 - foreground_count;
951                supervisor.queue_metrics.mark_batch_start(
952                    batch_size as u64,
953                    foreground_count,
954                    background_count,
955                );
956                let t0 = crate::perflog::now_ms();
957                crate::perflog!("core.batch.start size={batch_size}");
958                let result = match core_slot.as_mut() {
959                    Some(core) => catch_core_batch(|| handle_core_batch_responsive(core, batch)),
960                    None => break,
961                };
962                let elapsed_ms = crate::perflog::now_ms().saturating_sub(t0);
963                supervisor
964                    .queue_metrics
965                    .mark_batch_finished(foreground_count, background_count);
966                match result {
967                    Ok(true) => {
968                        crate::perflog!(
969                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms}"
970                        );
971                    }
972                    Ok(false) => {
973                        crate::perflog!(
974                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms} result=shutdown"
975                        );
976                        break;
977                    }
978                    Err(error) => {
979                        if let Some(mut failed_core) = core_slot.take() {
980                            failed_core.record_core_panic(error.clone());
981                        }
982                        crate::perflog!(
983                            "core.batch.end size={batch_size} elapsed_ms={elapsed_ms} result=panic"
984                        );
985                        match recover_core_after_panic(&supervisor, error) {
986                            Some(core) => core_slot = Some(core),
987                            None => break,
988                        }
989                    }
990                }
991            }
992        })
993}
994
995fn recover_core_after_panic(supervisor: &CoreSupervisor, detail: String) -> Option<AppCore> {
996    let restart_count = supervisor.recovery.mark_panic(detail);
997    crate::perflog!("core.supervisor.restart count={restart_count}");
998    set_shared_db(&supervisor.shared_db, None);
999
1000    let mut core = match AppCore::try_new(
1001        supervisor.update_tx.clone(),
1002        supervisor.core_sender.clone(),
1003        supervisor.data_dir.clone(),
1004        supervisor.shared_state.clone(),
1005    ) {
1006        Ok(core) => core,
1007        Err(error) => {
1008            crate::perflog!("core.supervisor.restart.failed count={restart_count} error={error}");
1009            publish_core_failure_state(
1010                &supervisor.shared_state,
1011                &supervisor.update_tx,
1012                CORE_RESTART_TOAST.to_string(),
1013            );
1014            return None;
1015        }
1016    };
1017
1018    set_shared_db(&supervisor.shared_db, Some(core.shared_db()));
1019    if let Some(action) = supervisor.recovery.restore_action() {
1020        crate::perflog!(
1021            "core.supervisor.restore action={:?}",
1022            std::mem::discriminant(&action)
1023        );
1024        match catch_core_batch(|| core.handle_message(CoreMsg::Action(action))) {
1025            Ok(true) => {}
1026            Ok(false) => {
1027                publish_core_failure_state(
1028                    &supervisor.shared_state,
1029                    &supervisor.update_tx,
1030                    CORE_RESTART_TOAST.to_string(),
1031                );
1032                return None;
1033            }
1034            Err(error) => {
1035                core.mark_core_panic(format!("core recovery restore panic: {error}"));
1036                return None;
1037            }
1038        }
1039    }
1040
1041    crate::perflog!("core.supervisor.recovered count={restart_count}");
1042    Some(core)
1043}
1044
1045fn set_shared_db(
1046    shared_db: &Arc<RwLock<Option<crate::core::SharedConnection>>>,
1047    value: Option<crate::core::SharedConnection>,
1048) {
1049    match shared_db.write() {
1050        Ok(mut slot) => *slot = value,
1051        Err(poison) => *poison.into_inner() = value,
1052    }
1053}
1054
1055fn publish_core_failure_state(
1056    shared_state: &Arc<RwLock<AppState>>,
1057    update_tx: &Sender<AppUpdate>,
1058    message: String,
1059) {
1060    let mut state = match shared_state.read() {
1061        Ok(slot) => slot.clone(),
1062        Err(poison) => poison.into_inner().clone(),
1063    };
1064    state.toast = Some(message);
1065    state.rev = state.rev.saturating_add(1).max(1);
1066    match shared_state.write() {
1067        Ok(mut slot) => *slot = state.clone(),
1068        Err(poison) => *poison.into_inner() = state.clone(),
1069    }
1070    let _ = update_tx.send(AppUpdate::FullState(state));
1071}
1072
1073const CORE_FOREGROUND_BATCH_LIMIT: usize = 64;
1074const CORE_BACKGROUND_BATCH_LIMIT: usize = 16;
1075
1076fn recv_core_batch(
1077    foreground_rx: &Receiver<CoreMsg>,
1078    background_rx: &Receiver<CoreMsg>,
1079) -> Result<Vec<CoreMsg>, flume::RecvError> {
1080    if let Some(batch) = try_recv_core_batch(foreground_rx, background_rx) {
1081        return Ok(batch);
1082    }
1083
1084    let (is_foreground, first) = flume::Selector::new()
1085        .recv(foreground_rx, |result| result.map(|msg| (true, msg)))
1086        .recv(background_rx, |result| result.map(|msg| (false, msg)))
1087        .wait()?;
1088    Ok(drain_core_batch_after_first(
1089        is_foreground,
1090        first,
1091        foreground_rx,
1092        background_rx,
1093    ))
1094}
1095
1096fn try_recv_core_batch(
1097    foreground_rx: &Receiver<CoreMsg>,
1098    background_rx: &Receiver<CoreMsg>,
1099) -> Option<Vec<CoreMsg>> {
1100    if let Ok(first) = foreground_rx.try_recv() {
1101        return Some(drain_core_batch_after_first(
1102            true,
1103            first,
1104            foreground_rx,
1105            background_rx,
1106        ));
1107    }
1108    background_rx
1109        .try_recv()
1110        .ok()
1111        .map(|first| drain_core_batch_after_first(false, first, foreground_rx, background_rx))
1112}
1113
1114fn drain_core_batch_after_first(
1115    is_foreground: bool,
1116    first: CoreMsg,
1117    foreground_rx: &Receiver<CoreMsg>,
1118    background_rx: &Receiver<CoreMsg>,
1119) -> Vec<CoreMsg> {
1120    let mut batch = Vec::with_capacity(if is_foreground {
1121        CORE_FOREGROUND_BATCH_LIMIT
1122    } else {
1123        CORE_BACKGROUND_BATCH_LIMIT
1124    });
1125    batch.push(first);
1126    drain_foreground_messages(&mut batch, foreground_rx);
1127    if batch.iter().any(is_foreground_core_msg) {
1128        return batch;
1129    }
1130
1131    while batch.len() < CORE_BACKGROUND_BATCH_LIMIT {
1132        let Ok(next) = background_rx.try_recv() else {
1133            break;
1134        };
1135        batch.push(next);
1136        drain_foreground_messages(&mut batch, foreground_rx);
1137        if batch.iter().any(is_foreground_core_msg) {
1138            break;
1139        }
1140    }
1141    batch
1142}
1143
1144fn drain_foreground_messages(batch: &mut Vec<CoreMsg>, foreground_rx: &Receiver<CoreMsg>) {
1145    while batch.len() < CORE_FOREGROUND_BATCH_LIMIT {
1146        let Ok(next) = foreground_rx.try_recv() else {
1147            break;
1148        };
1149        batch.push(next);
1150    }
1151}
1152
1153fn handle_core_batch_responsive(core: &mut AppCore, messages: Vec<CoreMsg>) -> bool {
1154    if messages.len() <= 1 || !messages.iter().any(is_foreground_core_msg) {
1155        return core.handle_messages(messages);
1156    }
1157
1158    let mut foreground = Vec::new();
1159    let mut background = Vec::new();
1160    for message in messages {
1161        if is_foreground_core_msg(&message) {
1162            foreground.push(message);
1163        } else {
1164            background.push(message);
1165        }
1166    }
1167
1168    for message in foreground {
1169        if !core.handle_message(message) {
1170            return false;
1171        }
1172    }
1173    background.is_empty() || core.handle_messages(background)
1174}
1175
1176fn catch_core_batch<F>(f: F) -> Result<bool, String>
1177where
1178    F: FnOnce() -> bool,
1179{
1180    panic::catch_unwind(AssertUnwindSafe(f)).map_err(panic_payload_to_string)
1181}
1182
1183fn ffi_or<T, F>(label: &'static str, fallback: T, f: F) -> T
1184where
1185    F: FnOnce() -> T,
1186{
1187    match panic::catch_unwind(AssertUnwindSafe(f)) {
1188        Ok(value) => value,
1189        Err(payload) => {
1190            crate::perflog!(
1191                "ffi.panic label={label} detail={}",
1192                panic_payload_to_string(payload)
1193            );
1194            fallback
1195        }
1196    }
1197}
1198
1199fn ffi_failure_state() -> AppState {
1200    let mut state = AppState::empty();
1201    state.toast = Some("Iris needs restart. Copy support bundle in Settings.".to_string());
1202    state
1203}
1204
1205fn suppressed_mobile_push_resolution() -> MobilePushNotificationResolution {
1206    MobilePushNotificationResolution {
1207        should_show: false,
1208        title: String::new(),
1209        body: String::new(),
1210        payload_json: "{}".to_string(),
1211    }
1212}
1213
1214fn panic_payload_to_string(payload: Box<dyn Any + Send>) -> String {
1215    if let Some(message) = payload.downcast_ref::<&str>() {
1216        (*message).to_string()
1217    } else if let Some(message) = payload.downcast_ref::<String>() {
1218        message.clone()
1219    } else {
1220        "unknown panic".to_string()
1221    }
1222}
1223
1224fn is_foreground_core_msg(message: &CoreMsg) -> bool {
1225    !matches!(message, CoreMsg::Internal(_))
1226}
1227
1228#[cfg(test)]
1229mod core_queue_tests {
1230    use super::*;
1231
1232    fn background_msg(index: usize) -> CoreMsg {
1233        CoreMsg::Internal(Box::new(InternalEvent::DebugLog {
1234            category: "test.background".to_string(),
1235            detail: index.to_string(),
1236        }))
1237    }
1238
1239    #[test]
1240    fn foreground_queue_preempts_background_backlog() {
1241        let (foreground_tx, foreground_rx) = flume::unbounded();
1242        let (background_tx, background_rx) = flume::unbounded();
1243        for index in 0..100 {
1244            background_tx.send(background_msg(index)).unwrap();
1245        }
1246        foreground_tx
1247            .send(CoreMsg::Action(AppAction::NavigateBack))
1248            .unwrap();
1249
1250        let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
1251
1252        assert!(matches!(
1253            batch.first(),
1254            Some(CoreMsg::Action(AppAction::NavigateBack))
1255        ));
1256        assert!(
1257            batch.iter().all(is_foreground_core_msg),
1258            "foreground work should not be bundled behind background backlog"
1259        );
1260    }
1261
1262    #[test]
1263    fn background_queue_drains_in_bounded_chunks() {
1264        let (_foreground_tx, foreground_rx) = flume::unbounded();
1265        let (background_tx, background_rx) = flume::unbounded();
1266        for index in 0..100 {
1267            background_tx.send(background_msg(index)).unwrap();
1268        }
1269
1270        let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
1271
1272        assert_eq!(batch.len(), CORE_BACKGROUND_BATCH_LIMIT);
1273        assert!(batch.iter().all(|msg| !is_foreground_core_msg(msg)));
1274    }
1275
1276    #[test]
1277    fn route_chat_snapshot_uses_chat_list_without_core_queue() {
1278        let state = build_large_test_app_state(80, 20, 1_200);
1279        let chat_id = state.chat_list[10].chat_id.clone();
1280
1281        let snapshot =
1282            crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).unwrap();
1283
1284        assert_eq!(snapshot.chat_id, chat_id);
1285        assert_eq!(snapshot.display_name, state.chat_list[10].display_name);
1286        assert!(snapshot.messages.is_empty());
1287    }
1288
1289    #[test]
1290    fn route_chat_snapshot_requires_account() {
1291        let mut state = build_large_test_app_state(80, 20, 1_200);
1292        state.account = None;
1293        let chat_id = state.chat_list[10].chat_id.clone();
1294
1295        assert!(crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).is_none());
1296    }
1297}
1298
1299fn filter_threads_for_search(
1300    chat_list: &[ChatThreadSnapshot],
1301    query: &str,
1302) -> (Vec<ChatThreadSnapshot>, Vec<ChatThreadSnapshot>) {
1303    let needle = query.to_lowercase();
1304    let mut contacts = Vec::new();
1305    let mut groups = Vec::new();
1306    for chat in chat_list {
1307        if !thread_matches_query(chat, &needle) {
1308            continue;
1309        }
1310        match chat.kind {
1311            ChatKind::Direct => contacts.push(chat.clone()),
1312            ChatKind::Group => groups.push(chat.clone()),
1313        }
1314    }
1315    (contacts, groups)
1316}
1317
1318fn thread_matches_query(chat: &ChatThreadSnapshot, needle_lower: &str) -> bool {
1319    let candidates: [&str; 7] = [
1320        &chat.display_name,
1321        chat.nickname.as_deref().unwrap_or(""),
1322        chat.profile_name.as_deref().unwrap_or(""),
1323        chat.about.as_deref().unwrap_or(""),
1324        chat.subtitle.as_deref().unwrap_or(""),
1325        &chat.draft,
1326        &chat.chat_id,
1327    ];
1328    candidates
1329        .iter()
1330        .any(|field| field.to_lowercase().contains(needle_lower))
1331}
1332
1333fn enrich_message_hits(
1334    hits: Vec<crate::core::PersistedMessageSearchHit>,
1335    chat_list: &[ChatThreadSnapshot],
1336) -> Vec<MessageSearchHit> {
1337    use std::collections::HashMap;
1338    let lookup: HashMap<&str, &ChatThreadSnapshot> = chat_list
1339        .iter()
1340        .map(|chat| (chat.chat_id.as_str(), chat))
1341        .collect();
1342    hits.into_iter()
1343        .map(|hit| {
1344            let parent = lookup.get(hit.chat_id.as_str());
1345            let display_name = parent
1346                .map(|chat| chat.display_name.clone())
1347                .unwrap_or_else(|| short_chat_label(&hit.chat_id));
1348            let picture_url = parent.and_then(|chat| chat.picture_url.clone());
1349            let kind = parent
1350                .map(|chat| chat.kind.clone())
1351                .unwrap_or(ChatKind::Direct);
1352            MessageSearchHit {
1353                chat_id: hit.chat_id,
1354                message_id: hit.message_id,
1355                chat_display_name: display_name,
1356                chat_picture_url: picture_url,
1357                chat_kind: kind,
1358                author_pubkey: hit.author,
1359                body: hit.body,
1360                is_outgoing: hit.is_outgoing,
1361                created_at_secs: hit.created_at_secs,
1362            }
1363        })
1364        .collect()
1365}
1366
1367fn short_chat_label(chat_id: &str) -> String {
1368    let trimmed = chat_id.trim();
1369    if trimmed.len() > 12 {
1370        format!("{}…", &trimmed[..12])
1371    } else {
1372        trimmed.to_string()
1373    }
1374}
1375
1376fn verify_nearby_presence_event_json(
1377    event_json: &str,
1378    peer_id: &str,
1379    my_nonce: &str,
1380    their_nonce: &str,
1381) -> String {
1382    let Ok(event) = serde_json::from_str::<nostr_sdk::prelude::Event>(event_json) else {
1383        return String::new();
1384    };
1385    if event.verify().is_err() || event.kind.as_u16() != crate::core::NEARBY_PRESENCE_KIND {
1386        return String::new();
1387    }
1388    let Ok(content) = serde_json::from_str::<serde_json::Value>(&event.content) else {
1389        return String::new();
1390    };
1391    let get = |key: &str| {
1392        content
1393            .get(key)
1394            .and_then(|value| value.as_str())
1395            .unwrap_or("")
1396    };
1397    let transport = get("transport");
1398    if get("protocol") != "iris-nearby-v1"
1399        || !(transport == "ble" || transport == "nearby" || transport == "lan")
1400        || get("peer_id") != peer_id.trim()
1401        || get("my_nonce") != their_nonce.trim()
1402        || get("their_nonce") != my_nonce.trim()
1403    {
1404        return String::new();
1405    }
1406
1407    let now = SystemTime::now()
1408        .duration_since(UNIX_EPOCH)
1409        .unwrap_or_default()
1410        .as_secs();
1411    let expires_at = content
1412        .get("expires_at")
1413        .and_then(|value| value.as_u64())
1414        .unwrap_or(0);
1415    let created_at = event.created_at.as_secs();
1416    if expires_at < now
1417        || expires_at > now.saturating_add(300)
1418        || created_at.saturating_add(300) < now
1419        || created_at > now.saturating_add(300)
1420    {
1421        return String::new();
1422    }
1423
1424    let profile_event_id = get("profile_event_id");
1425    let profile_event_id = if profile_event_id.len() == 64 {
1426        profile_event_id
1427    } else {
1428        ""
1429    };
1430    serde_json::json!({
1431        "owner_pubkey_hex": event.pubkey.to_hex(),
1432        "profile_event_id": profile_event_id,
1433    })
1434    .to_string()
1435}
1436
1437impl Drop for FfiApp {
1438    fn drop(&mut self) {
1439        let _ = self.foreground_tx.send(CoreMsg::Shutdown(None));
1440    }
1441}
1442
1443#[uniffi::export]
1444pub fn normalize_peer_input(input: String) -> String {
1445    ffi_or("normalize_peer_input", String::new(), || {
1446        crate::core::normalize_peer_input_for_display(&input)
1447    })
1448}
1449
1450#[uniffi::export]
1451pub fn is_valid_peer_input(input: String) -> bool {
1452    ffi_or("is_valid_peer_input", false, || {
1453        crate::core::parse_peer_input(&input).is_ok()
1454    })
1455}
1456
1457/// Single source of truth for "is this typed text an npub or an
1458/// invite URL?". Used by the New Chat paste field, the chat-list
1459/// search bar, and the deep-link handler so all three branches agree
1460/// on what counts as a chat-opening shortcut. Returns `None` for
1461/// regular search-style text.
1462#[uniffi::export]
1463pub fn classify_chat_input(input: String) -> Option<ChatInputShortcut> {
1464    ffi_or("classify_chat_input", None, || chat_input_shortcut(&input))
1465}
1466
1467fn chat_input_shortcut(raw: &str) -> Option<ChatInputShortcut> {
1468    let trimmed = raw.trim();
1469    if trimmed.is_empty() {
1470        return None;
1471    }
1472    let lower = trimmed.to_lowercase();
1473    if lower.contains("://") && lower.contains('#') {
1474        return Some(ChatInputShortcut::Invite {
1475            invite_input: trimmed.to_string(),
1476            display: short_invite_display(trimmed),
1477        });
1478    }
1479    if crate::core::parse_peer_input(trimmed).is_ok() {
1480        use nostr::nips::nip19::ToBech32;
1481        let normalized = crate::core::normalize_peer_input_for_display(trimmed);
1482        if let Ok(pubkey) = nostr::PublicKey::parse(&normalized) {
1483            let npub = pubkey.to_bech32().unwrap_or_else(|_| normalized.clone());
1484            let display = short_npub_display(&npub);
1485            return Some(ChatInputShortcut::DirectPeer {
1486                peer_input: normalized,
1487                display,
1488                npub,
1489                pubkey_hex: pubkey.to_hex(),
1490            });
1491        }
1492    }
1493    None
1494}
1495
1496fn short_npub_display(npub: &str) -> String {
1497    if npub.len() > 16 {
1498        format!("{}…{}", &npub[..10], &npub[npub.len() - 4..])
1499    } else {
1500        npub.to_string()
1501    }
1502}
1503
1504fn short_invite_display(invite: &str) -> String {
1505    // Strip the scheme + host so the row reads "iris.to/invite/…" not
1506    // a 120-char URL. We don't try to parse the bech32 payload; the
1507    // visible host is enough context for the user.
1508    let after_scheme = invite
1509        .split_once("://")
1510        .map(|(_, rest)| rest)
1511        .unwrap_or(invite);
1512    if after_scheme.len() > 32 {
1513        format!("{}…", &after_scheme[..32])
1514    } else {
1515        after_scheme.to_string()
1516    }
1517}
1518
1519/// Convert any pubkey-shaped input (hex, npub, nprofile, …) to its
1520/// canonical lowercase-hex form. The empty string is returned when the
1521/// input can't be parsed as a public key — callers expecting hex
1522/// downstream can short-circuit on that.
1523#[uniffi::export]
1524pub fn peer_input_to_hex(input: String) -> String {
1525    ffi_or("peer_input_to_hex", String::new(), || {
1526        let normalized = crate::core::normalize_peer_input_for_display(&input);
1527        match nostr::PublicKey::parse(&normalized) {
1528            Ok(pubkey) => pubkey.to_hex(),
1529            Err(_) => String::new(),
1530        }
1531    })
1532}
1533
1534/// Convert any pubkey-shaped input (hex, npub, nprofile, …) to its npub form.
1535/// Returns the original string when it can't be parsed as a public key.
1536#[uniffi::export]
1537pub fn peer_input_to_npub(input: String) -> String {
1538    ffi_or("peer_input_to_npub", String::new(), || {
1539        use nostr::nips::nip19::ToBech32;
1540        let normalized = crate::core::normalize_peer_input_for_display(&input);
1541        match nostr::PublicKey::parse(&normalized) {
1542            Ok(pubkey) => pubkey.to_bech32().unwrap_or(normalized),
1543            Err(_) => normalized,
1544        }
1545    })
1546}
1547
1548#[uniffi::export]
1549pub fn build_summary() -> String {
1550    ffi_or("build_summary", String::new(), crate::core::build_summary)
1551}
1552
1553#[uniffi::export]
1554pub fn relay_set_id() -> String {
1555    ffi_or("relay_set_id", String::new(), || {
1556        crate::core::relay_set_id().to_string()
1557    })
1558}
1559
1560#[uniffi::export]
1561pub fn proxied_image_url(
1562    original_src: String,
1563    preferences: PreferencesSnapshot,
1564    width: Option<u32>,
1565    height: Option<u32>,
1566    square: bool,
1567) -> String {
1568    ffi_or("proxied_image_url", original_src.clone(), || {
1569        image_proxy::proxied_image_url(&original_src, &preferences, width, height, square)
1570    })
1571}
1572
1573#[uniffi::export]
1574pub fn is_trusted_test_build() -> bool {
1575    ffi_or(
1576        "is_trusted_test_build",
1577        false,
1578        crate::core::trusted_test_build_flag,
1579    )
1580}
1581
1582/// Marketing version baked in at build time from `IRIS_APP_VERSION_NAME`
1583/// (or `IRIS_APP_VERSION`), falling back to the crate semver. Use this
1584/// instead of `env!("CARGO_PKG_VERSION")` so UI/release artifacts agree
1585/// on a single version string.
1586#[uniffi::export]
1587pub fn app_version() -> String {
1588    crate::core::app_version_string().to_string()
1589}
1590
1591#[uniffi::export]
1592pub fn resolve_mobile_push_notification_payload(
1593    raw_payload_json: String,
1594) -> MobilePushNotificationResolution {
1595    ffi_or(
1596        "resolve_mobile_push_notification_payload",
1597        suppressed_mobile_push_resolution(),
1598        || crate::core::resolve_mobile_push_notification(raw_payload_json),
1599    )
1600}
1601
1602/// Decrypt a notification payload against the persisted double-ratchet
1603/// state under `data_dir`. Use from the FCM service (Android) or
1604/// Notification Service Extension (iOS) where there's no live `FfiApp`.
1605/// Falls back to the generic resolver when keys, payload, or storage
1606/// are unavailable so the user still gets *some* notification.
1607#[uniffi::export]
1608pub fn decrypt_mobile_push_notification_payload(
1609    data_dir: String,
1610    owner_pubkey_hex: String,
1611    device_nsec: String,
1612    raw_payload_json: String,
1613) -> MobilePushNotificationResolution {
1614    ffi_or(
1615        "decrypt_mobile_push_notification_payload",
1616        suppressed_mobile_push_resolution(),
1617        || {
1618            crate::core::decrypt_mobile_push_notification(
1619                data_dir,
1620                owner_pubkey_hex,
1621                device_nsec,
1622                raw_payload_json,
1623            )
1624        },
1625    )
1626}
1627
1628#[uniffi::export]
1629pub fn resolve_mobile_push_subscription_server_url(
1630    platform_key: String,
1631    is_release: bool,
1632    override_url: Option<String>,
1633) -> String {
1634    ffi_or(
1635        "resolve_mobile_push_subscription_server_url",
1636        String::new(),
1637        || crate::core::resolve_mobile_push_server_url(platform_key, is_release, override_url),
1638    )
1639}
1640
1641#[uniffi::export]
1642pub fn mobile_push_subscription_id_key(platform_key: String) -> String {
1643    ffi_or("mobile_push_subscription_id_key", String::new(), || {
1644        crate::core::mobile_push_stored_subscription_id_key(platform_key)
1645    })
1646}
1647
1648#[uniffi::export]
1649pub fn build_mobile_push_list_subscriptions_request(
1650    owner_nsec: String,
1651    platform_key: String,
1652    is_release: bool,
1653    server_url_override: Option<String>,
1654) -> Option<MobilePushSubscriptionRequest> {
1655    ffi_or("build_mobile_push_list_subscriptions_request", None, || {
1656        crate::core::build_mobile_push_list_subscriptions_request(
1657            owner_nsec,
1658            platform_key,
1659            is_release,
1660            server_url_override,
1661        )
1662    })
1663}
1664
1665#[uniffi::export]
1666#[allow(clippy::too_many_arguments)]
1667pub fn build_mobile_push_create_subscription_request(
1668    owner_nsec: String,
1669    platform_key: String,
1670    push_token: String,
1671    apns_topic: Option<String>,
1672    message_author_pubkeys: Vec<String>,
1673    invite_response_pubkeys: Vec<String>,
1674    is_release: bool,
1675    server_url_override: Option<String>,
1676) -> Option<MobilePushSubscriptionRequest> {
1677    ffi_or(
1678        "build_mobile_push_create_subscription_request",
1679        None,
1680        || {
1681            crate::core::build_mobile_push_create_subscription_request(
1682                owner_nsec,
1683                platform_key,
1684                push_token,
1685                apns_topic,
1686                message_author_pubkeys,
1687                invite_response_pubkeys,
1688                is_release,
1689                server_url_override,
1690            )
1691        },
1692    )
1693}
1694
1695#[uniffi::export]
1696#[allow(clippy::too_many_arguments)]
1697pub fn build_mobile_push_update_subscription_request(
1698    owner_nsec: String,
1699    subscription_id: String,
1700    platform_key: String,
1701    push_token: String,
1702    apns_topic: Option<String>,
1703    message_author_pubkeys: Vec<String>,
1704    invite_response_pubkeys: Vec<String>,
1705    is_release: bool,
1706    server_url_override: Option<String>,
1707) -> Option<MobilePushSubscriptionRequest> {
1708    ffi_or(
1709        "build_mobile_push_update_subscription_request",
1710        None,
1711        || {
1712            crate::core::build_mobile_push_update_subscription_request(
1713                owner_nsec,
1714                subscription_id,
1715                platform_key,
1716                push_token,
1717                apns_topic,
1718                message_author_pubkeys,
1719                invite_response_pubkeys,
1720                is_release,
1721                server_url_override,
1722            )
1723        },
1724    )
1725}
1726
1727#[uniffi::export]
1728pub fn build_mobile_push_delete_subscription_request(
1729    owner_nsec: String,
1730    subscription_id: String,
1731    platform_key: String,
1732    is_release: bool,
1733    server_url_override: Option<String>,
1734) -> Option<MobilePushSubscriptionRequest> {
1735    ffi_or(
1736        "build_mobile_push_delete_subscription_request",
1737        None,
1738        || {
1739            crate::core::build_mobile_push_delete_subscription_request(
1740                owner_nsec,
1741                subscription_id,
1742                platform_key,
1743                is_release,
1744                server_url_override,
1745            )
1746        },
1747    )
1748}
1749
1750#[cfg(test)]
1751mod ffi_hardening_tests {
1752    use super::*;
1753
1754    #[test]
1755    fn ffi_guard_returns_fallback_after_panic() {
1756        let value = ffi_or("test.panic", 42, || -> i32 {
1757            panic!("ffi boom");
1758        });
1759
1760        assert_eq!(value, 42);
1761    }
1762
1763    #[test]
1764    fn core_batch_guard_converts_panic_to_error() {
1765        let result = catch_core_batch(|| -> bool {
1766            panic!("batch boom");
1767        });
1768
1769        assert_eq!(result, Err("batch boom".to_string()));
1770    }
1771
1772    #[test]
1773    fn core_batch_guard_preserves_success_result() {
1774        assert_eq!(catch_core_batch(|| true), Ok(true));
1775        assert_eq!(catch_core_batch(|| false), Ok(false));
1776    }
1777
1778    #[test]
1779    fn recovery_state_tracks_restore_action_and_logout() {
1780        let recovery = CoreRecoveryState::default();
1781        recovery.remember_action(&AppAction::RestoreSession {
1782            owner_nsec: "secret".to_string(),
1783        });
1784
1785        match recovery.restore_action() {
1786            Some(AppAction::RestoreSession { owner_nsec }) => assert_eq!(owner_nsec, "secret"),
1787            other => panic!("unexpected restore action: {other:?}"),
1788        }
1789
1790        recovery.remember_action(&AppAction::Logout);
1791        assert!(recovery.restore_action().is_none());
1792    }
1793
1794    #[test]
1795    fn recovery_state_tracks_persisted_account_bundle() {
1796        let recovery = CoreRecoveryState::default();
1797        recovery.remember_update(&AppUpdate::PersistAccountBundle {
1798            rev: 7,
1799            owner_nsec: None,
1800            owner_pubkey_hex: "owner".to_string(),
1801            device_nsec: "device-secret".to_string(),
1802        });
1803
1804        match recovery.restore_action() {
1805            Some(AppAction::RestoreAccountBundle {
1806                owner_nsec,
1807                owner_pubkey_hex,
1808                device_nsec,
1809            }) => {
1810                assert_eq!(owner_nsec, None);
1811                assert_eq!(owner_pubkey_hex, "owner");
1812                assert_eq!(device_nsec, "device-secret");
1813            }
1814            other => panic!("unexpected restore action: {other:?}"),
1815        }
1816    }
1817
1818    #[test]
1819    fn nearby_published_events_wait_behind_latest_state_in_drained_batch() {
1820        let mut latest_full_state = None;
1821        let mut before_full_state = Vec::new();
1822        let mut after_full_state = Vec::new();
1823
1824        enqueue_update_for_delivery(
1825            AppUpdate::NearbyPublishedEvent {
1826                event_id: "a".repeat(64),
1827                kind: 14,
1828                created_at_secs: 1,
1829                event_json: "{}".to_string(),
1830            },
1831            &mut latest_full_state,
1832            &mut before_full_state,
1833            &mut after_full_state,
1834        );
1835        let mut stale = AppState::empty();
1836        stale.rev = 1;
1837        enqueue_update_for_delivery(
1838            AppUpdate::FullState(stale),
1839            &mut latest_full_state,
1840            &mut before_full_state,
1841            &mut after_full_state,
1842        );
1843        enqueue_update_for_delivery(
1844            AppUpdate::PersistAccountBundle {
1845                rev: 2,
1846                owner_nsec: None,
1847                owner_pubkey_hex: "owner".to_string(),
1848                device_nsec: "device".to_string(),
1849            },
1850            &mut latest_full_state,
1851            &mut before_full_state,
1852            &mut after_full_state,
1853        );
1854        let mut latest = AppState::empty();
1855        latest.rev = 3;
1856        enqueue_update_for_delivery(
1857            AppUpdate::FullState(latest),
1858            &mut latest_full_state,
1859            &mut before_full_state,
1860            &mut after_full_state,
1861        );
1862
1863        let order = before_full_state
1864            .into_iter()
1865            .chain(latest_full_state)
1866            .chain(after_full_state)
1867            .map(|update| match update {
1868                AppUpdate::PersistAccountBundle { .. } => "persist".to_string(),
1869                AppUpdate::FullState(state) => format!("state:{}", state.rev),
1870                AppUpdate::NearbyPublishedEvent { .. } => "nearby".to_string(),
1871            })
1872            .collect::<Vec<_>>();
1873
1874        assert_eq!(order, vec!["persist", "state:3", "nearby"]);
1875    }
1876
1877    #[test]
1878    fn core_supervisor_recovers_after_batch_panic() {
1879        let temp_dir = tempfile::TempDir::new().expect("temp dir");
1880        let app = new_ffi_app_inner(temp_dir.path().to_string_lossy().to_string());
1881
1882        app.foreground_tx
1883            .send(CoreMsg::PanicForTest)
1884            .expect("send test panic");
1885
1886        for _ in 0..40 {
1887            if app.recovery.restart_count() > 0 {
1888                break;
1889            }
1890            thread::sleep(Duration::from_millis(25));
1891        }
1892        assert_eq!(app.recovery.restart_count(), 1);
1893
1894        let (reply_tx, reply_rx) = flume::bounded(1);
1895        app.foreground_tx
1896            .send(CoreMsg::CorePerfCounters(reply_tx))
1897            .expect("send post-recovery request");
1898        assert!(reply_rx.recv_timeout(Duration::from_secs(2)).is_ok());
1899
1900        app.shutdown();
1901    }
1902}