1mod actions;
2mod core;
3pub mod desktop_nearby;
4pub mod image_proxy;
5pub mod local_relay;
6pub mod perflog;
7mod qr;
8mod state;
9mod test_fixtures;
10mod updates;
11
12use std::any::Any;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::{Arc, RwLock};
15use std::thread;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17use std::{panic, panic::AssertUnwindSafe};
18
19use flume::{Receiver, Sender};
20
21pub use actions::AppAction;
22pub use qr::*;
23pub use state::*;
24pub use test_fixtures::*;
25pub use updates::*;
26
27use crate::core::AppCore;
28
29uniffi::setup_scaffolding!();
30
31#[uniffi::export(callback_interface)]
32pub trait AppReconciler: Send + Sync + 'static {
33 fn reconcile(&self, update: AppUpdate);
34}
35
36#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
37pub struct DesktopNearbyPeerSnapshot {
38 pub id: String,
39 pub name: String,
40 pub owner_pubkey_hex: Option<String>,
41 pub picture_url: Option<String>,
42 pub profile_event_id: Option<String>,
43 pub last_seen_secs: u64,
44}
45
46#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq)]
47pub struct DesktopNearbySnapshot {
48 pub visible: bool,
49 pub status: String,
50 pub peers: Vec<DesktopNearbyPeerSnapshot>,
51}
52
53#[uniffi::export(callback_interface)]
54pub trait DesktopNearbyObserver: Send + Sync + 'static {
55 fn desktop_nearby_changed(&self, snapshot: DesktopNearbySnapshot);
56}
57
58#[derive(Default, Debug)]
71pub(crate) struct FfiPerfCounters {
72 pub state: AtomicU64,
73 pub dispatch: AtomicU64,
74 pub search: AtomicU64,
75 pub ingest_nearby_event_json: AtomicU64,
76 pub export_support_bundle_json: AtomicU64,
77 pub peer_profile_debug: AtomicU64,
78 pub prepare_for_suspend: AtomicU64,
79}
80
81#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
82pub struct FfiPerfCountersSnapshot {
83 pub state: u64,
84 pub dispatch: u64,
85 pub search: u64,
86 pub ingest_nearby_event_json: u64,
87 pub export_support_bundle_json: u64,
88 pub peer_profile_debug: u64,
89 pub prepare_for_suspend: u64,
90}
91
92#[derive(uniffi::Record, Clone, Debug, PartialEq, Eq, Default)]
101pub struct CorePerfCountersSnapshot {
102 pub debug_snapshot_builds: u64,
103}
104
105#[derive(uniffi::Object)]
106pub struct FfiApp {
107 foreground_tx: Sender<CoreMsg>,
108 foreground_rx: Receiver<CoreMsg>,
109 background_tx: Sender<CoreMsg>,
110 background_rx: Receiver<CoreMsg>,
111 update_rx: Receiver<AppUpdate>,
112 listening: AtomicBool,
113 shared_state: Arc<RwLock<AppState>>,
114 shared_db: Option<crate::core::SharedConnection>,
118 perf: FfiPerfCounters,
119 queue_metrics: Arc<CoreQueueMetrics>,
120}
121
122#[derive(Default, Debug)]
123struct CoreQueueMetrics {
124 foreground_processed: AtomicU64,
125 background_processed: AtomicU64,
126 batch_active: AtomicBool,
127 last_batch_started_at_ms: AtomicU64,
128 last_batch_finished_at_ms: AtomicU64,
129 last_batch_size: AtomicU64,
130 last_batch_foreground_count: AtomicU64,
131 last_batch_background_count: AtomicU64,
132}
133
134impl CoreQueueMetrics {
135 fn mark_batch_start(&self, size: u64, foreground: u64, background: u64) {
136 self.last_batch_started_at_ms
137 .store(crate::perflog::now_ms(), Ordering::Relaxed);
138 self.last_batch_size.store(size, Ordering::Relaxed);
139 self.last_batch_foreground_count
140 .store(foreground, Ordering::Relaxed);
141 self.last_batch_background_count
142 .store(background, Ordering::Relaxed);
143 self.batch_active.store(true, Ordering::Release);
144 }
145
146 fn mark_batch_finished(&self, foreground: u64, background: u64) {
147 self.foreground_processed
148 .fetch_add(foreground, Ordering::Relaxed);
149 self.background_processed
150 .fetch_add(background, Ordering::Relaxed);
151 self.last_batch_finished_at_ms
152 .store(crate::perflog::now_ms(), Ordering::Relaxed);
153 self.batch_active.store(false, Ordering::Release);
154 }
155}
156
157#[derive(uniffi::Object)]
158pub struct FfiDesktopNearby {
159 service: Arc<desktop_nearby::DesktopNearbyService>,
160}
161
162#[uniffi::export]
163impl FfiApp {
164 #[uniffi::constructor]
165 pub fn new(data_dir: String, _keychain_group: String, _app_version: String) -> Arc<Self> {
166 match panic::catch_unwind(AssertUnwindSafe(|| new_ffi_app_inner(data_dir))) {
167 Ok(app) => app,
168 Err(payload) => ffi_app_failure(format!(
169 "Iris could not start: {}",
170 panic_payload_to_string(payload)
171 )),
172 }
173 }
174
175 pub fn state(&self) -> AppState {
176 self.perf.state.fetch_add(1, Ordering::Relaxed);
177 ffi_or("ffiapp.state", ffi_failure_state(), || {
178 match self.shared_state.read() {
179 Ok(slot) => slot.clone(),
180 Err(poison) => poison.into_inner().clone(),
181 }
182 })
183 }
184
185 pub fn dispatch(&self, action: AppAction) {
186 self.perf.dispatch.fetch_add(1, Ordering::Relaxed);
187 ffi_or("ffiapp.dispatch", (), || {
188 crate::perflog!("ffi.dispatch action={:?}", std::mem::discriminant(&action));
189 let _ = self.foreground_tx.send(CoreMsg::Action(action));
190 })
191 }
192
193 pub fn perf_counters(&self) -> FfiPerfCountersSnapshot {
199 FfiPerfCountersSnapshot {
200 state: self.perf.state.load(Ordering::Relaxed),
201 dispatch: self.perf.dispatch.load(Ordering::Relaxed),
202 search: self.perf.search.load(Ordering::Relaxed),
203 ingest_nearby_event_json: self.perf.ingest_nearby_event_json.load(Ordering::Relaxed),
204 export_support_bundle_json: self
205 .perf
206 .export_support_bundle_json
207 .load(Ordering::Relaxed),
208 peer_profile_debug: self.perf.peer_profile_debug.load(Ordering::Relaxed),
209 prepare_for_suspend: self.perf.prepare_for_suspend.load(Ordering::Relaxed),
210 }
211 }
212
213 pub fn core_perf_counters(&self) -> CorePerfCountersSnapshot {
219 ffi_or(
220 "ffiapp.core_perf_counters",
221 CorePerfCountersSnapshot::default(),
222 || {
223 let (reply_tx, reply_rx) = flume::bounded(1);
224 if self
225 .foreground_tx
226 .send(CoreMsg::CorePerfCounters(reply_tx))
227 .is_err()
228 {
229 return CorePerfCountersSnapshot::default();
230 }
231 match reply_rx.recv_timeout(Duration::from_secs(2)) {
232 Ok(snapshot) => CorePerfCountersSnapshot {
233 debug_snapshot_builds: snapshot.debug_snapshot_builds,
234 },
235 Err(_) => CorePerfCountersSnapshot::default(),
236 }
237 },
238 )
239 }
240
241 pub fn search(
248 &self,
249 query: String,
250 scope_chat_id: Option<String>,
251 limit: u32,
252 ) -> SearchResultSnapshot {
253 self.perf.search.fetch_add(1, Ordering::Relaxed);
254 ffi_or(
255 "ffiapp.search",
256 SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone()),
257 || {
258 let trimmed = query.trim();
259 if trimmed.is_empty() {
260 return SearchResultSnapshot::empty(query.clone(), scope_chat_id.clone());
261 }
262 let limit = limit.max(1) as usize;
263 let state_snapshot = match self.shared_state.read() {
264 Ok(slot) => slot.clone(),
265 Err(poison) => poison.into_inner().clone(),
266 };
267 let (contacts, groups) = if scope_chat_id.is_some() {
268 (Vec::new(), Vec::new())
269 } else {
270 filter_threads_for_search(&state_snapshot.chat_list, trimmed)
271 };
272 let messages = match self.shared_db.as_ref() {
273 Some(shared) => match shared.lock() {
274 Ok(conn) => crate::core::search_messages_fts(
275 &conn,
276 trimmed,
277 scope_chat_id.as_deref(),
278 limit,
279 )
280 .unwrap_or_default(),
281 Err(poison) => crate::core::search_messages_fts(
282 &poison.into_inner(),
283 trimmed,
284 scope_chat_id.as_deref(),
285 limit,
286 )
287 .unwrap_or_default(),
288 },
289 None => Vec::new(),
290 };
291 let enriched = enrich_message_hits(messages, &state_snapshot.chat_list);
292 let shortcut = if scope_chat_id.is_none() {
297 chat_input_shortcut(trimmed)
298 } else {
299 None
300 };
301 SearchResultSnapshot {
302 query,
303 scope_chat_id,
304 contacts,
305 groups,
306 messages: enriched,
307 shortcut,
308 }
309 },
310 )
311 }
312
313 pub fn chat_snapshot(&self, chat_id: String, limit: u32) -> Option<CurrentChatSnapshot> {
319 ffi_or("ffiapp.chat_snapshot", None, || {
320 let state_snapshot = match self.shared_state.read() {
321 Ok(slot) => slot.clone(),
322 Err(poison) => poison.into_inner().clone(),
323 };
324 crate::core::chat_snapshot_from_state_and_db(
325 &state_snapshot,
326 self.shared_db.as_ref(),
327 &chat_id,
328 limit.max(1) as usize,
329 )
330 })
331 }
332
333 pub fn chat_snapshot_before(
334 &self,
335 chat_id: String,
336 before_message_id: String,
337 limit: u32,
338 ) -> Option<CurrentChatSnapshot> {
339 ffi_or("ffiapp.chat_snapshot_before", None, || {
340 let state_snapshot = match self.shared_state.read() {
341 Ok(slot) => slot.clone(),
342 Err(poison) => poison.into_inner().clone(),
343 };
344 crate::core::chat_snapshot_before_from_state_and_db(
345 &state_snapshot,
346 self.shared_db.as_ref(),
347 &chat_id,
348 &before_message_id,
349 limit.max(1) as usize,
350 )
351 })
352 }
353
354 pub fn chat_snapshot_around_message(
355 &self,
356 chat_id: String,
357 message_id: String,
358 before_limit: u32,
359 after_limit: u32,
360 ) -> Option<CurrentChatSnapshot> {
361 ffi_or("ffiapp.chat_snapshot_around_message", None, || {
362 let state_snapshot = match self.shared_state.read() {
363 Ok(slot) => slot.clone(),
364 Err(poison) => poison.into_inner().clone(),
365 };
366 crate::core::chat_snapshot_around_message_from_state_and_db(
367 &state_snapshot,
368 self.shared_db.as_ref(),
369 &chat_id,
370 &message_id,
371 before_limit as usize,
372 after_limit as usize,
373 )
374 })
375 }
376
377 pub fn ingest_nearby_event_json(&self, event_json: String) -> bool {
378 self.perf
379 .ingest_nearby_event_json
380 .fetch_add(1, Ordering::Relaxed);
381 self.ingest_nearby_event_json_with_transport(event_json, String::new())
382 }
383
384 pub fn ingest_nearby_event_json_with_transport(
385 &self,
386 event_json: String,
387 transport: String,
388 ) -> bool {
389 ffi_or("ffiapp.ingest_nearby_event_json", false, || {
390 let event = match serde_json::from_str::<nostr_sdk::prelude::Event>(&event_json) {
391 Ok(event) => event,
392 Err(_) => return false,
393 };
394 if event.verify().is_err() {
395 return false;
396 }
397 self.background_tx
398 .send(CoreMsg::Internal(Box::new(InternalEvent::NearbyEvent {
399 event,
400 transport,
401 })))
402 .is_ok()
403 })
404 }
405
406 pub fn build_nearby_presence_event_json(
407 &self,
408 peer_id: String,
409 my_nonce: String,
410 their_nonce: String,
411 profile_event_id: String,
412 ) -> String {
413 ffi_or(
414 "ffiapp.build_nearby_presence_event_json",
415 String::new(),
416 || {
417 let (reply_tx, reply_rx) = flume::bounded(1);
418 if self
419 .background_tx
420 .send(CoreMsg::BuildNearbyPresenceEvent {
421 peer_id,
422 my_nonce,
423 their_nonce,
424 profile_event_id,
425 reply_tx,
426 })
427 .is_err()
428 {
429 return String::new();
430 }
431 reply_rx
432 .recv_timeout(Duration::from_secs(2))
433 .unwrap_or_default()
434 },
435 )
436 }
437
438 pub fn verify_nearby_presence_event_json(
439 &self,
440 event_json: String,
441 peer_id: String,
442 my_nonce: String,
443 their_nonce: String,
444 ) -> String {
445 ffi_or(
446 "ffiapp.verify_nearby_presence_event_json",
447 String::new(),
448 || verify_nearby_presence_event_json(&event_json, &peer_id, &my_nonce, &their_nonce),
449 )
450 }
451
452 pub fn nearby_encode_frame(&self, envelope_json: String) -> Vec<u8> {
453 ffi_or("ffiapp.nearby_encode_frame", Vec::new(), || {
454 nostr_double_ratchet_runtime::encode_nearby_frame_json(&envelope_json)
455 .unwrap_or_default()
456 })
457 }
458
459 pub fn nearby_decode_frame(&self, frame: Vec<u8>) -> String {
460 ffi_or("ffiapp.nearby_decode_frame", String::new(), || {
461 nostr_double_ratchet_runtime::decode_nearby_frame_json(&frame).unwrap_or_default()
462 })
463 }
464
465 pub fn nearby_frame_body_len_from_header(&self, header: Vec<u8>) -> i32 {
466 ffi_or("ffiapp.nearby_frame_body_len_from_header", -1, || {
467 nostr_double_ratchet_runtime::nearby_frame_body_len_from_header(&header)
468 .and_then(|len| i32::try_from(len).ok())
469 .unwrap_or(-1)
470 })
471 }
472
473 pub fn export_support_bundle_json(&self) -> String {
474 self.perf
475 .export_support_bundle_json
476 .fetch_add(1, Ordering::Relaxed);
477 ffi_or(
478 "ffiapp.export_support_bundle_json",
479 self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
480 || {
481 let (reply_tx, reply_rx) = flume::bounded(1);
482 if self
483 .foreground_tx
484 .send(CoreMsg::ExportSupportBundle(reply_tx))
485 .is_err()
486 {
487 return self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true);
488 }
489 match reply_rx.recv_timeout(Duration::from_secs(2)) {
490 Ok(json) => self.support_bundle_json_with_ffi_diagnostics(json, false),
491 Err(_) => self.support_bundle_json_with_ffi_diagnostics("{}".to_string(), true),
492 }
493 },
494 )
495 }
496
497 pub fn peer_profile_debug(&self, owner_input: String) -> Option<PeerProfileDebugSnapshot> {
498 self.perf.peer_profile_debug.fetch_add(1, Ordering::Relaxed);
499 ffi_or("ffiapp.peer_profile_debug", None, || {
500 let (reply_tx, reply_rx) = flume::bounded(1);
501 if self
502 .foreground_tx
503 .send(CoreMsg::PeerProfileDebug {
504 owner_input,
505 reply_tx,
506 })
507 .is_err()
508 {
509 return None;
510 }
511 reply_rx.recv_timeout(Duration::from_secs(2)).ok().flatten()
512 })
513 }
514
515 pub fn prepare_for_suspend(&self) {
516 self.perf
517 .prepare_for_suspend
518 .fetch_add(1, Ordering::Relaxed);
519 ffi_or("ffiapp.prepare_for_suspend", (), || {
520 let (reply_tx, reply_rx) = flume::bounded(1);
521 if self
522 .foreground_tx
523 .send(CoreMsg::PrepareForSuspend(reply_tx))
524 .is_err()
525 {
526 return;
527 }
528 let _ = reply_rx.recv_timeout(Duration::from_secs(2));
529 })
530 }
531
532 pub fn shutdown(&self) {
533 ffi_or("ffiapp.shutdown", (), || {
534 let (reply_tx, reply_rx) = flume::bounded(1);
535 if self
536 .foreground_tx
537 .send(CoreMsg::Shutdown(Some(reply_tx)))
538 .is_err()
539 {
540 return;
541 }
542 let _ = reply_rx.recv_timeout(Duration::from_secs(2));
543 })
544 }
545
546 fn support_bundle_json_with_ffi_diagnostics(
547 &self,
548 rust_json: String,
549 core_support_bundle_timed_out: bool,
550 ) -> String {
551 let mut object = serde_json::from_str::<serde_json::Value>(&rust_json)
552 .ok()
553 .and_then(|value| value.as_object().cloned())
554 .unwrap_or_default();
555 let now_ms = crate::perflog::now_ms();
556 let last_started_at_ms = self
557 .queue_metrics
558 .last_batch_started_at_ms
559 .load(Ordering::Relaxed);
560 let last_finished_at_ms = self
561 .queue_metrics
562 .last_batch_finished_at_ms
563 .load(Ordering::Relaxed);
564 let batch_active = self.queue_metrics.batch_active.load(Ordering::Acquire);
565 let active_batch_age_ms = if batch_active && last_started_at_ms > 0 {
566 Some(now_ms.saturating_sub(last_started_at_ms))
567 } else {
568 None
569 };
570 let last_batch_started_ago_ms = if last_started_at_ms > 0 {
571 Some(now_ms.saturating_sub(last_started_at_ms))
572 } else {
573 None
574 };
575 let last_batch_finished_ago_ms = if last_finished_at_ms > 0 {
576 Some(now_ms.saturating_sub(last_finished_at_ms))
577 } else {
578 None
579 };
580 object.insert(
581 "ffi_queue".to_string(),
582 serde_json::json!({
583 "core_support_bundle_timed_out": core_support_bundle_timed_out,
584 "foreground_pending": self.foreground_rx.len(),
585 "background_pending": self.background_rx.len(),
586 "foreground_processed": self.queue_metrics.foreground_processed.load(Ordering::Relaxed),
587 "background_processed": self.queue_metrics.background_processed.load(Ordering::Relaxed),
588 "batch_active": batch_active,
589 "active_batch_age_ms": active_batch_age_ms,
590 "last_batch_started_ago_ms": last_batch_started_ago_ms,
591 "last_batch_finished_ago_ms": last_batch_finished_ago_ms,
592 "last_batch_size": self.queue_metrics.last_batch_size.load(Ordering::Relaxed),
593 "last_batch_foreground_count": self.queue_metrics.last_batch_foreground_count.load(Ordering::Relaxed),
594 "last_batch_background_count": self.queue_metrics.last_batch_background_count.load(Ordering::Relaxed),
595 }),
596 );
597 serde_json::Value::Object(object).to_string()
598 }
599
600 pub fn listen_for_updates(&self, reconciler: Box<dyn AppReconciler>) {
601 if self
602 .listening
603 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
604 .is_err()
605 {
606 return;
607 }
608
609 let update_rx = self.update_rx.clone();
610 let spawn_result = thread::Builder::new()
611 .name("iris-updates".to_string())
612 .spawn(move || {
613 while let Ok(first) = update_rx.recv() {
625 let mut latest_full_state: Option<AppUpdate> = None;
626 let mut sidecar: Vec<AppUpdate> = Vec::new();
627 let process =
628 |update: AppUpdate,
629 latest: &mut Option<AppUpdate>,
630 side: &mut Vec<AppUpdate>| match update {
631 full @ AppUpdate::FullState(_) => *latest = Some(full),
632 other => side.push(other),
633 };
634 process(first, &mut latest_full_state, &mut sidecar);
635 while let Ok(next) = update_rx.try_recv() {
636 process(next, &mut latest_full_state, &mut sidecar);
637 }
638 for update in sidecar.into_iter().chain(latest_full_state) {
639 let kind = match &update {
640 AppUpdate::FullState(_) => "FullState",
641 AppUpdate::PersistAccountBundle { .. } => "PersistAccountBundle",
642 AppUpdate::NearbyPublishedEvent { .. } => "NearbyPublishedEvent",
643 };
644 let t0 = crate::perflog::now_ms();
645 crate::perflog!("reconcile.start kind={kind}");
646 if panic::catch_unwind(AssertUnwindSafe(|| reconciler.reconcile(update)))
647 .is_err()
648 {
649 crate::perflog!("reconcile.failed kind={kind}");
650 continue;
651 }
652 crate::perflog!(
653 "reconcile.end kind={kind} elapsed_ms={}",
654 crate::perflog::now_ms().saturating_sub(t0)
655 );
656 }
657 }
658 });
659 if let Err(error) = spawn_result {
660 crate::perflog!("updates.spawn.failed error={error}");
661 self.listening.store(false, Ordering::SeqCst);
662 }
663 }
664}
665
666#[uniffi::export]
667impl FfiDesktopNearby {
668 #[uniffi::constructor]
669 pub fn new(app: Arc<FfiApp>, observer: Box<dyn DesktopNearbyObserver>) -> Arc<Self> {
670 Arc::new(Self {
671 service: desktop_nearby::DesktopNearbyService::new(app, observer.into()),
672 })
673 }
674
675 pub fn start(&self, local_name: String) {
676 self.service.start(local_name);
677 }
678
679 pub fn stop(&self) {
680 self.service.stop();
681 }
682
683 pub fn snapshot(&self) -> DesktopNearbySnapshot {
684 self.service.snapshot()
685 }
686
687 pub fn publish(&self, event_id: String, kind: u32, created_at_secs: u64, event_json: String) {
688 self.service
689 .publish(event_id, kind, created_at_secs, event_json);
690 }
691}
692
693fn new_ffi_app_inner(data_dir: String) -> Arc<FfiApp> {
694 let (update_tx, update_rx) = flume::unbounded();
695 let (foreground_tx, foreground_rx) = flume::unbounded();
696 let (background_tx, background_rx) = flume::unbounded();
697 let shared_state = Arc::new(RwLock::new(AppState::empty()));
698 let queue_metrics = Arc::new(CoreQueueMetrics::default());
699
700 let core_tx_for_thread = background_tx.clone();
701 let shared_for_thread = shared_state.clone();
702 let update_tx_for_error = update_tx.clone();
703 let mut shared_db = None;
704 match AppCore::try_new(update_tx, core_tx_for_thread, data_dir, shared_for_thread) {
705 Ok(mut core) => {
706 shared_db = Some(core.shared_db());
707 let queue_metrics_for_thread = queue_metrics.clone();
708 let foreground_rx_for_thread = foreground_rx.clone();
709 let background_rx_for_thread = background_rx.clone();
710 let spawn_result =
711 thread::Builder::new()
712 .name("iris-core".to_string())
713 .spawn(move || {
714 while let Ok(batch) =
719 recv_core_batch(&foreground_rx_for_thread, &background_rx_for_thread)
720 {
721 let batch_size = batch.len();
722 let foreground_count = batch
723 .iter()
724 .filter(|msg| is_foreground_core_msg(msg))
725 .count() as u64;
726 let background_count = batch_size as u64 - foreground_count;
727 queue_metrics_for_thread.mark_batch_start(
728 batch_size as u64,
729 foreground_count,
730 background_count,
731 );
732 let t0 = crate::perflog::now_ms();
733 crate::perflog!("core.batch.start size={batch_size}");
734 match catch_core_batch(|| {
735 handle_core_batch_responsive(&mut core, batch)
736 }) {
737 Ok(true) => {}
738 Ok(false) => break,
739 Err(error) => {
740 core.mark_core_panic(error);
741 break;
742 }
743 }
744 crate::perflog!(
745 "core.batch.end size={batch_size} elapsed_ms={}",
746 crate::perflog::now_ms().saturating_sub(t0)
747 );
748 queue_metrics_for_thread
749 .mark_batch_finished(foreground_count, background_count);
750 }
751 });
752 if let Err(error) = spawn_result {
753 let mut state = AppState::empty();
754 state.toast = Some(format!("Iris could not start: {error}"));
755 state.rev = 1;
756 match shared_state.write() {
757 Ok(mut slot) => *slot = state.clone(),
758 Err(poison) => *poison.into_inner() = state.clone(),
759 }
760 let _ = update_tx_for_error.send(AppUpdate::FullState(state));
761 }
762 }
763 Err(error) => {
764 let mut state = AppState::empty();
765 state.toast = Some(error.to_string());
766 state.rev = 1;
767 match shared_state.write() {
768 Ok(mut slot) => *slot = state.clone(),
769 Err(poison) => *poison.into_inner() = state.clone(),
770 }
771 let _ = update_tx_for_error.send(AppUpdate::FullState(state));
772 }
773 }
774
775 Arc::new(FfiApp {
776 foreground_tx,
777 foreground_rx,
778 background_tx,
779 background_rx,
780 update_rx,
781 listening: AtomicBool::new(false),
782 shared_state,
783 shared_db,
784 perf: FfiPerfCounters::default(),
785 queue_metrics,
786 })
787}
788
789fn ffi_app_failure(message: String) -> Arc<FfiApp> {
790 let (_update_tx, update_rx) = flume::unbounded();
791 let (foreground_tx, foreground_rx) = flume::unbounded();
792 let (background_tx, background_rx) = flume::unbounded();
793 let mut state = AppState::empty();
794 state.toast = Some(message);
795 state.rev = 1;
796 let shared_state = Arc::new(RwLock::new(state));
797 Arc::new(FfiApp {
798 foreground_tx,
799 foreground_rx,
800 background_tx,
801 background_rx,
802 update_rx,
803 listening: AtomicBool::new(false),
804 shared_state,
805 shared_db: None,
806 perf: FfiPerfCounters::default(),
807 queue_metrics: Arc::new(CoreQueueMetrics::default()),
808 })
809}
810
811const CORE_FOREGROUND_BATCH_LIMIT: usize = 64;
812const CORE_BACKGROUND_BATCH_LIMIT: usize = 16;
813
814fn recv_core_batch(
815 foreground_rx: &Receiver<CoreMsg>,
816 background_rx: &Receiver<CoreMsg>,
817) -> Result<Vec<CoreMsg>, flume::RecvError> {
818 if let Some(batch) = try_recv_core_batch(foreground_rx, background_rx) {
819 return Ok(batch);
820 }
821
822 let (is_foreground, first) = flume::Selector::new()
823 .recv(foreground_rx, |result| result.map(|msg| (true, msg)))
824 .recv(background_rx, |result| result.map(|msg| (false, msg)))
825 .wait()?;
826 Ok(drain_core_batch_after_first(
827 is_foreground,
828 first,
829 foreground_rx,
830 background_rx,
831 ))
832}
833
834fn try_recv_core_batch(
835 foreground_rx: &Receiver<CoreMsg>,
836 background_rx: &Receiver<CoreMsg>,
837) -> Option<Vec<CoreMsg>> {
838 if let Ok(first) = foreground_rx.try_recv() {
839 return Some(drain_core_batch_after_first(
840 true,
841 first,
842 foreground_rx,
843 background_rx,
844 ));
845 }
846 background_rx
847 .try_recv()
848 .ok()
849 .map(|first| drain_core_batch_after_first(false, first, foreground_rx, background_rx))
850}
851
852fn drain_core_batch_after_first(
853 is_foreground: bool,
854 first: CoreMsg,
855 foreground_rx: &Receiver<CoreMsg>,
856 background_rx: &Receiver<CoreMsg>,
857) -> Vec<CoreMsg> {
858 let mut batch = Vec::with_capacity(if is_foreground {
859 CORE_FOREGROUND_BATCH_LIMIT
860 } else {
861 CORE_BACKGROUND_BATCH_LIMIT
862 });
863 batch.push(first);
864 drain_foreground_messages(&mut batch, foreground_rx);
865 if batch.iter().any(is_foreground_core_msg) {
866 return batch;
867 }
868
869 while batch.len() < CORE_BACKGROUND_BATCH_LIMIT {
870 let Ok(next) = background_rx.try_recv() else {
871 break;
872 };
873 batch.push(next);
874 drain_foreground_messages(&mut batch, foreground_rx);
875 if batch.iter().any(is_foreground_core_msg) {
876 break;
877 }
878 }
879 batch
880}
881
882fn drain_foreground_messages(batch: &mut Vec<CoreMsg>, foreground_rx: &Receiver<CoreMsg>) {
883 while batch.len() < CORE_FOREGROUND_BATCH_LIMIT {
884 let Ok(next) = foreground_rx.try_recv() else {
885 break;
886 };
887 batch.push(next);
888 }
889}
890
891fn handle_core_batch_responsive(core: &mut AppCore, messages: Vec<CoreMsg>) -> bool {
892 if messages.len() <= 1 || !messages.iter().any(is_foreground_core_msg) {
893 return core.handle_messages(messages);
894 }
895
896 let mut foreground = Vec::new();
897 let mut background = Vec::new();
898 for message in messages {
899 if is_foreground_core_msg(&message) {
900 foreground.push(message);
901 } else {
902 background.push(message);
903 }
904 }
905
906 for message in foreground {
907 if !core.handle_message(message) {
908 return false;
909 }
910 }
911 background.is_empty() || core.handle_messages(background)
912}
913
914fn catch_core_batch<F>(f: F) -> Result<bool, String>
915where
916 F: FnOnce() -> bool,
917{
918 panic::catch_unwind(AssertUnwindSafe(f)).map_err(panic_payload_to_string)
919}
920
921fn ffi_or<T, F>(label: &'static str, fallback: T, f: F) -> T
922where
923 F: FnOnce() -> T,
924{
925 match panic::catch_unwind(AssertUnwindSafe(f)) {
926 Ok(value) => value,
927 Err(payload) => {
928 crate::perflog!(
929 "ffi.panic label={label} detail={}",
930 panic_payload_to_string(payload)
931 );
932 fallback
933 }
934 }
935}
936
937fn ffi_failure_state() -> AppState {
938 let mut state = AppState::empty();
939 state.toast = Some("Iris needs restart. Copy support bundle in Settings.".to_string());
940 state
941}
942
943fn suppressed_mobile_push_resolution() -> MobilePushNotificationResolution {
944 MobilePushNotificationResolution {
945 should_show: false,
946 title: String::new(),
947 body: String::new(),
948 payload_json: "{}".to_string(),
949 }
950}
951
952fn panic_payload_to_string(payload: Box<dyn Any + Send>) -> String {
953 if let Some(message) = payload.downcast_ref::<&str>() {
954 (*message).to_string()
955 } else if let Some(message) = payload.downcast_ref::<String>() {
956 message.clone()
957 } else {
958 "unknown panic".to_string()
959 }
960}
961
962fn is_foreground_core_msg(message: &CoreMsg) -> bool {
963 !matches!(message, CoreMsg::Internal(_))
964}
965
966#[cfg(test)]
967mod core_queue_tests {
968 use super::*;
969
970 fn background_msg(index: usize) -> CoreMsg {
971 CoreMsg::Internal(Box::new(InternalEvent::DebugLog {
972 category: "test.background".to_string(),
973 detail: index.to_string(),
974 }))
975 }
976
977 #[test]
978 fn foreground_queue_preempts_background_backlog() {
979 let (foreground_tx, foreground_rx) = flume::unbounded();
980 let (background_tx, background_rx) = flume::unbounded();
981 for index in 0..100 {
982 background_tx.send(background_msg(index)).unwrap();
983 }
984 foreground_tx
985 .send(CoreMsg::Action(AppAction::NavigateBack))
986 .unwrap();
987
988 let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
989
990 assert!(matches!(
991 batch.first(),
992 Some(CoreMsg::Action(AppAction::NavigateBack))
993 ));
994 assert!(
995 batch.iter().all(is_foreground_core_msg),
996 "foreground work should not be bundled behind background backlog"
997 );
998 }
999
1000 #[test]
1001 fn background_queue_drains_in_bounded_chunks() {
1002 let (_foreground_tx, foreground_rx) = flume::unbounded();
1003 let (background_tx, background_rx) = flume::unbounded();
1004 for index in 0..100 {
1005 background_tx.send(background_msg(index)).unwrap();
1006 }
1007
1008 let batch = recv_core_batch(&foreground_rx, &background_rx).unwrap();
1009
1010 assert_eq!(batch.len(), CORE_BACKGROUND_BATCH_LIMIT);
1011 assert!(batch.iter().all(|msg| !is_foreground_core_msg(msg)));
1012 }
1013
1014 #[test]
1015 fn route_chat_snapshot_uses_chat_list_without_core_queue() {
1016 let state = build_large_test_app_state(80, 20, 1_200);
1017 let chat_id = state.chat_list[10].chat_id.clone();
1018
1019 let snapshot =
1020 crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).unwrap();
1021
1022 assert_eq!(snapshot.chat_id, chat_id);
1023 assert_eq!(snapshot.display_name, state.chat_list[10].display_name);
1024 assert!(snapshot.messages.is_empty());
1025 }
1026
1027 #[test]
1028 fn route_chat_snapshot_requires_account() {
1029 let mut state = build_large_test_app_state(80, 20, 1_200);
1030 state.account = None;
1031 let chat_id = state.chat_list[10].chat_id.clone();
1032
1033 assert!(crate::core::chat_snapshot_from_state_and_db(&state, None, &chat_id, 80).is_none());
1034 }
1035}
1036
1037fn filter_threads_for_search(
1038 chat_list: &[ChatThreadSnapshot],
1039 query: &str,
1040) -> (Vec<ChatThreadSnapshot>, Vec<ChatThreadSnapshot>) {
1041 let needle = query.to_lowercase();
1042 let mut contacts = Vec::new();
1043 let mut groups = Vec::new();
1044 for chat in chat_list {
1045 if !thread_matches_query(chat, &needle) {
1046 continue;
1047 }
1048 match chat.kind {
1049 ChatKind::Direct => contacts.push(chat.clone()),
1050 ChatKind::Group => groups.push(chat.clone()),
1051 }
1052 }
1053 (contacts, groups)
1054}
1055
1056fn thread_matches_query(chat: &ChatThreadSnapshot, needle_lower: &str) -> bool {
1057 let candidates: [&str; 3] = [
1058 &chat.display_name,
1059 chat.subtitle.as_deref().unwrap_or(""),
1060 &chat.chat_id,
1061 ];
1062 candidates
1063 .iter()
1064 .any(|field| field.to_lowercase().contains(needle_lower))
1065}
1066
1067fn enrich_message_hits(
1068 hits: Vec<crate::core::PersistedMessageSearchHit>,
1069 chat_list: &[ChatThreadSnapshot],
1070) -> Vec<MessageSearchHit> {
1071 use std::collections::HashMap;
1072 let lookup: HashMap<&str, &ChatThreadSnapshot> = chat_list
1073 .iter()
1074 .map(|chat| (chat.chat_id.as_str(), chat))
1075 .collect();
1076 hits.into_iter()
1077 .map(|hit| {
1078 let parent = lookup.get(hit.chat_id.as_str());
1079 let display_name = parent
1080 .map(|chat| chat.display_name.clone())
1081 .unwrap_or_else(|| short_chat_label(&hit.chat_id));
1082 let picture_url = parent.and_then(|chat| chat.picture_url.clone());
1083 let kind = parent
1084 .map(|chat| chat.kind.clone())
1085 .unwrap_or(ChatKind::Direct);
1086 MessageSearchHit {
1087 chat_id: hit.chat_id,
1088 message_id: hit.message_id,
1089 chat_display_name: display_name,
1090 chat_picture_url: picture_url,
1091 chat_kind: kind,
1092 author_pubkey: hit.author,
1093 body: hit.body,
1094 is_outgoing: hit.is_outgoing,
1095 created_at_secs: hit.created_at_secs,
1096 }
1097 })
1098 .collect()
1099}
1100
1101fn short_chat_label(chat_id: &str) -> String {
1102 let trimmed = chat_id.trim();
1103 if trimmed.len() > 12 {
1104 format!("{}…", &trimmed[..12])
1105 } else {
1106 trimmed.to_string()
1107 }
1108}
1109
1110fn verify_nearby_presence_event_json(
1111 event_json: &str,
1112 peer_id: &str,
1113 my_nonce: &str,
1114 their_nonce: &str,
1115) -> String {
1116 let Ok(event) = serde_json::from_str::<nostr_sdk::prelude::Event>(event_json) else {
1117 return String::new();
1118 };
1119 if event.verify().is_err() || event.kind.as_u16() != crate::core::NEARBY_PRESENCE_KIND {
1120 return String::new();
1121 }
1122 let Ok(content) = serde_json::from_str::<serde_json::Value>(&event.content) else {
1123 return String::new();
1124 };
1125 let get = |key: &str| {
1126 content
1127 .get(key)
1128 .and_then(|value| value.as_str())
1129 .unwrap_or("")
1130 };
1131 let transport = get("transport");
1132 if get("protocol") != "iris-nearby-v1"
1133 || !(transport == "ble" || transport == "nearby" || transport == "lan")
1134 || get("peer_id") != peer_id.trim()
1135 || get("my_nonce") != their_nonce.trim()
1136 || get("their_nonce") != my_nonce.trim()
1137 {
1138 return String::new();
1139 }
1140
1141 let now = SystemTime::now()
1142 .duration_since(UNIX_EPOCH)
1143 .unwrap_or_default()
1144 .as_secs();
1145 let expires_at = content
1146 .get("expires_at")
1147 .and_then(|value| value.as_u64())
1148 .unwrap_or(0);
1149 let created_at = event.created_at.as_secs();
1150 if expires_at < now
1151 || expires_at > now.saturating_add(300)
1152 || created_at.saturating_add(300) < now
1153 || created_at > now.saturating_add(300)
1154 {
1155 return String::new();
1156 }
1157
1158 let profile_event_id = get("profile_event_id");
1159 let profile_event_id = if profile_event_id.len() == 64 {
1160 profile_event_id
1161 } else {
1162 ""
1163 };
1164 serde_json::json!({
1165 "owner_pubkey_hex": event.pubkey.to_hex(),
1166 "profile_event_id": profile_event_id,
1167 })
1168 .to_string()
1169}
1170
1171impl Drop for FfiApp {
1172 fn drop(&mut self) {
1173 let _ = self.foreground_tx.send(CoreMsg::Shutdown(None));
1174 }
1175}
1176
1177#[uniffi::export]
1178pub fn normalize_peer_input(input: String) -> String {
1179 ffi_or("normalize_peer_input", String::new(), || {
1180 crate::core::normalize_peer_input_for_display(&input)
1181 })
1182}
1183
1184#[uniffi::export]
1185pub fn is_valid_peer_input(input: String) -> bool {
1186 ffi_or("is_valid_peer_input", false, || {
1187 crate::core::parse_peer_input(&input).is_ok()
1188 })
1189}
1190
1191#[uniffi::export]
1197pub fn classify_chat_input(input: String) -> Option<ChatInputShortcut> {
1198 ffi_or("classify_chat_input", None, || chat_input_shortcut(&input))
1199}
1200
1201fn chat_input_shortcut(raw: &str) -> Option<ChatInputShortcut> {
1202 let trimmed = raw.trim();
1203 if trimmed.is_empty() {
1204 return None;
1205 }
1206 let lower = trimmed.to_lowercase();
1207 if lower.contains("://") && lower.contains('#') {
1208 return Some(ChatInputShortcut::Invite {
1209 invite_input: trimmed.to_string(),
1210 display: short_invite_display(trimmed),
1211 });
1212 }
1213 if crate::core::parse_peer_input(trimmed).is_ok() {
1214 use nostr::nips::nip19::ToBech32;
1215 let normalized = crate::core::normalize_peer_input_for_display(trimmed);
1216 if let Ok(pubkey) = nostr::PublicKey::parse(&normalized) {
1217 let npub = pubkey.to_bech32().unwrap_or_else(|_| normalized.clone());
1218 let display = short_npub_display(&npub);
1219 return Some(ChatInputShortcut::DirectPeer {
1220 peer_input: normalized,
1221 display,
1222 npub,
1223 pubkey_hex: pubkey.to_hex(),
1224 });
1225 }
1226 }
1227 None
1228}
1229
1230fn short_npub_display(npub: &str) -> String {
1231 if npub.len() > 16 {
1232 format!("{}…{}", &npub[..10], &npub[npub.len() - 4..])
1233 } else {
1234 npub.to_string()
1235 }
1236}
1237
1238fn short_invite_display(invite: &str) -> String {
1239 let after_scheme = invite
1243 .split_once("://")
1244 .map(|(_, rest)| rest)
1245 .unwrap_or(invite);
1246 if after_scheme.len() > 32 {
1247 format!("{}…", &after_scheme[..32])
1248 } else {
1249 after_scheme.to_string()
1250 }
1251}
1252
1253#[uniffi::export]
1258pub fn peer_input_to_hex(input: String) -> String {
1259 ffi_or("peer_input_to_hex", String::new(), || {
1260 let normalized = crate::core::normalize_peer_input_for_display(&input);
1261 match nostr::PublicKey::parse(&normalized) {
1262 Ok(pubkey) => pubkey.to_hex(),
1263 Err(_) => String::new(),
1264 }
1265 })
1266}
1267
1268#[uniffi::export]
1271pub fn peer_input_to_npub(input: String) -> String {
1272 ffi_or("peer_input_to_npub", String::new(), || {
1273 use nostr::nips::nip19::ToBech32;
1274 let normalized = crate::core::normalize_peer_input_for_display(&input);
1275 match nostr::PublicKey::parse(&normalized) {
1276 Ok(pubkey) => pubkey.to_bech32().unwrap_or(normalized),
1277 Err(_) => normalized,
1278 }
1279 })
1280}
1281
1282#[uniffi::export]
1283pub fn build_summary() -> String {
1284 ffi_or("build_summary", String::new(), crate::core::build_summary)
1285}
1286
1287#[uniffi::export]
1288pub fn relay_set_id() -> String {
1289 ffi_or("relay_set_id", String::new(), || {
1290 crate::core::relay_set_id().to_string()
1291 })
1292}
1293
1294#[uniffi::export]
1295pub fn proxied_image_url(
1296 original_src: String,
1297 preferences: PreferencesSnapshot,
1298 width: Option<u32>,
1299 height: Option<u32>,
1300 square: bool,
1301) -> String {
1302 ffi_or("proxied_image_url", original_src.clone(), || {
1303 image_proxy::proxied_image_url(&original_src, &preferences, width, height, square)
1304 })
1305}
1306
1307#[uniffi::export]
1308pub fn is_trusted_test_build() -> bool {
1309 ffi_or(
1310 "is_trusted_test_build",
1311 false,
1312 crate::core::trusted_test_build_flag,
1313 )
1314}
1315
1316#[uniffi::export]
1321pub fn app_version() -> String {
1322 crate::core::app_version_string().to_string()
1323}
1324
1325#[uniffi::export]
1326pub fn resolve_mobile_push_notification_payload(
1327 raw_payload_json: String,
1328) -> MobilePushNotificationResolution {
1329 ffi_or(
1330 "resolve_mobile_push_notification_payload",
1331 suppressed_mobile_push_resolution(),
1332 || crate::core::resolve_mobile_push_notification(raw_payload_json),
1333 )
1334}
1335
1336#[uniffi::export]
1342pub fn decrypt_mobile_push_notification_payload(
1343 data_dir: String,
1344 owner_pubkey_hex: String,
1345 device_nsec: String,
1346 raw_payload_json: String,
1347) -> MobilePushNotificationResolution {
1348 ffi_or(
1349 "decrypt_mobile_push_notification_payload",
1350 suppressed_mobile_push_resolution(),
1351 || {
1352 crate::core::decrypt_mobile_push_notification(
1353 data_dir,
1354 owner_pubkey_hex,
1355 device_nsec,
1356 raw_payload_json,
1357 )
1358 },
1359 )
1360}
1361
1362#[uniffi::export]
1363pub fn resolve_mobile_push_subscription_server_url(
1364 platform_key: String,
1365 is_release: bool,
1366 override_url: Option<String>,
1367) -> String {
1368 ffi_or(
1369 "resolve_mobile_push_subscription_server_url",
1370 String::new(),
1371 || crate::core::resolve_mobile_push_server_url(platform_key, is_release, override_url),
1372 )
1373}
1374
1375#[uniffi::export]
1376pub fn mobile_push_subscription_id_key(platform_key: String) -> String {
1377 ffi_or("mobile_push_subscription_id_key", String::new(), || {
1378 crate::core::mobile_push_stored_subscription_id_key(platform_key)
1379 })
1380}
1381
1382#[uniffi::export]
1383pub fn build_mobile_push_list_subscriptions_request(
1384 owner_nsec: String,
1385 platform_key: String,
1386 is_release: bool,
1387 server_url_override: Option<String>,
1388) -> Option<MobilePushSubscriptionRequest> {
1389 ffi_or("build_mobile_push_list_subscriptions_request", None, || {
1390 crate::core::build_mobile_push_list_subscriptions_request(
1391 owner_nsec,
1392 platform_key,
1393 is_release,
1394 server_url_override,
1395 )
1396 })
1397}
1398
1399#[uniffi::export]
1400#[allow(clippy::too_many_arguments)]
1401pub fn build_mobile_push_create_subscription_request(
1402 owner_nsec: String,
1403 platform_key: String,
1404 push_token: String,
1405 apns_topic: Option<String>,
1406 message_author_pubkeys: Vec<String>,
1407 invite_response_pubkeys: Vec<String>,
1408 is_release: bool,
1409 server_url_override: Option<String>,
1410) -> Option<MobilePushSubscriptionRequest> {
1411 ffi_or(
1412 "build_mobile_push_create_subscription_request",
1413 None,
1414 || {
1415 crate::core::build_mobile_push_create_subscription_request(
1416 owner_nsec,
1417 platform_key,
1418 push_token,
1419 apns_topic,
1420 message_author_pubkeys,
1421 invite_response_pubkeys,
1422 is_release,
1423 server_url_override,
1424 )
1425 },
1426 )
1427}
1428
1429#[uniffi::export]
1430#[allow(clippy::too_many_arguments)]
1431pub fn build_mobile_push_update_subscription_request(
1432 owner_nsec: String,
1433 subscription_id: String,
1434 platform_key: String,
1435 push_token: String,
1436 apns_topic: Option<String>,
1437 message_author_pubkeys: Vec<String>,
1438 invite_response_pubkeys: Vec<String>,
1439 is_release: bool,
1440 server_url_override: Option<String>,
1441) -> Option<MobilePushSubscriptionRequest> {
1442 ffi_or(
1443 "build_mobile_push_update_subscription_request",
1444 None,
1445 || {
1446 crate::core::build_mobile_push_update_subscription_request(
1447 owner_nsec,
1448 subscription_id,
1449 platform_key,
1450 push_token,
1451 apns_topic,
1452 message_author_pubkeys,
1453 invite_response_pubkeys,
1454 is_release,
1455 server_url_override,
1456 )
1457 },
1458 )
1459}
1460
1461#[uniffi::export]
1462pub fn build_mobile_push_delete_subscription_request(
1463 owner_nsec: String,
1464 subscription_id: String,
1465 platform_key: String,
1466 is_release: bool,
1467 server_url_override: Option<String>,
1468) -> Option<MobilePushSubscriptionRequest> {
1469 ffi_or(
1470 "build_mobile_push_delete_subscription_request",
1471 None,
1472 || {
1473 crate::core::build_mobile_push_delete_subscription_request(
1474 owner_nsec,
1475 subscription_id,
1476 platform_key,
1477 is_release,
1478 server_url_override,
1479 )
1480 },
1481 )
1482}
1483
1484#[cfg(test)]
1485mod ffi_hardening_tests {
1486 use super::*;
1487
1488 #[test]
1489 fn ffi_guard_returns_fallback_after_panic() {
1490 let value = ffi_or("test.panic", 42, || -> i32 {
1491 panic!("ffi boom");
1492 });
1493
1494 assert_eq!(value, 42);
1495 }
1496
1497 #[test]
1498 fn core_batch_guard_converts_panic_to_error() {
1499 let result = catch_core_batch(|| -> bool {
1500 panic!("batch boom");
1501 });
1502
1503 assert_eq!(result, Err("batch boom".to_string()));
1504 }
1505
1506 #[test]
1507 fn core_batch_guard_preserves_success_result() {
1508 assert_eq!(catch_core_batch(|| true), Ok(true));
1509 assert_eq!(catch_core_batch(|| false), Ok(false));
1510 }
1511}