Skip to main content

huddle_core/app/
mod.rs

1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{RoomAnnouncement, RoomMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25    self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26    StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32/// Lobby-facing view of a known dial peer: persisted address plus
33/// runtime "is the connection currently up?" status.
34#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36    pub address: String,
37    pub label: Option<String>,
38    pub last_connected_at: Option<i64>,
39    pub connected_peer_id: Option<PeerId>,
40}
41
42/// Parse a user-entered dial address into a libp2p `Multiaddr`.
43/// Accepts `ip:port`, `[ipv6]:port`, or a raw multiaddr starting with `/`.
44pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45    let trimmed = input.trim();
46    if trimmed.is_empty() {
47        return Err(HuddleError::Other("address is empty".into()));
48    }
49    if trimmed.starts_with('/') {
50        return trimmed
51            .parse::<Multiaddr>()
52            .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53    }
54    if let Some(rest) = trimmed.strip_prefix('[') {
55        let (host, port) = rest
56            .split_once("]:")
57            .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58        let port: u16 = port
59            .parse()
60            .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61        return format!("/ip6/{}/tcp/{}", host, port)
62            .parse::<Multiaddr>()
63            .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64    }
65    let (host, port) = trimmed
66        .rsplit_once(':')
67        .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68    if host.contains(':') {
69        return Err(HuddleError::Other(format!(
70            "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71        )));
72    }
73    let port: u16 = port
74        .parse()
75        .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76    format!("/ip4/{}/tcp/{}", host, port)
77        .parse::<Multiaddr>()
78        .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81/// State for a room we've created or joined this session.
82struct ActiveRoom {
83    info: StoredRoom,
84    crypto: Option<RoomCrypto>,
85    /// Argon2id-derived 32-byte key for unwrapping incoming session keys.
86    /// None for unencrypted rooms.
87    passphrase_key: Option<[u8; KEY_LEN]>,
88    /// Fingerprints of members currently known to be in the room.
89    members: HashSet<String>,
90    /// Ephemeral typing indicators: fingerprint → unix expiry. Pruned
91    /// on read; never persisted.
92    typers: HashMap<String, i64>,
93}
94
95const TYPING_TTL_SECS: i64 = 3;
96
97/// TTL for a discovered room before it's considered stale (re-announcements
98/// happen every 15 seconds; after 45s of silence we drop it).
99const DISCOVERED_TTL_SECS: i64 = 45;
100const ANNOUNCE_INTERVAL_SECS: u64 = 15;
101
102#[derive(Clone)]
103pub struct AppHandle {
104    identity: Arc<Identity>,
105    network: NetworkHandle,
106    mode: NetworkMode,
107    active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
108    discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
109    /// Encrypted rooms loaded from storage that we haven't rejoined yet
110    /// in this session (their passphrase-derived key isn't in memory).
111    /// Surfaced in the lobby so the user can re-enter with passphrase.
112    restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
113    /// Peer addresses we've dialed in this process; tracks "is the
114    /// connection currently up" for known peers shown in the lobby.
115    connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
116    /// File chunking + cache + downloads.
117    file_manager: Arc<FileManager>,
118    db: Db,
119    app_event_tx: broadcast::Sender<AppEvent>,
120}
121
122impl AppHandle {
123    pub async fn start() -> Result<Self> {
124        Self::start_with_options(NetworkMode::Mdns, 0, None).await
125    }
126
127    pub async fn start_with_options(
128        mode: NetworkMode,
129        port: u16,
130        master_key: Option<&[u8; 32]>,
131    ) -> Result<Self> {
132        config::ensure_data_dir()?;
133        if let Some(mk) = master_key {
134            let subkey = storage::keychain::derive_subkey(mk, b"megolm-persist");
135            crate::crypto::megolm::install_session_persist_key(subkey);
136        }
137        let db = storage::open_db(&config::db_path(), master_key)?;
138        Self::start_with_db_and_options(db, mode, port).await
139    }
140
141    pub async fn start_with_db(db: Db) -> Result<Self> {
142        Self::start_with_db_and_options(db, NetworkMode::Mdns, 0).await
143    }
144
145    pub async fn start_with_db_and_options(
146        db: Db,
147        mode: NetworkMode,
148        port: u16,
149    ) -> Result<Self> {
150        let identity = Self::load_or_create_identity(&db)?;
151        let identity = Arc::new(identity);
152        info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, "identity loaded");
153
154        let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
155        let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
156        let network = network::start_network_with(&identity, net_event_tx, mode, port)?;
157
158        let active_rooms = Arc::new(Mutex::new(HashMap::new()));
159        let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
160        let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
161        let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
162        let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
163
164        let handle = Self {
165            identity,
166            network,
167            mode,
168            active_rooms,
169            discovered_rooms,
170            restorable_rooms,
171            connected_dial_addrs,
172            file_manager,
173            db,
174            app_event_tx,
175        };
176
177        handle.spawn_event_processor(net_event_rx);
178        handle.spawn_announcement_ticker();
179        handle.spawn_discovered_room_pruner();
180        handle.spawn_known_peer_reconnector();
181        handle.restore_rooms_from_db().await;
182
183        Ok(handle)
184    }
185
186    pub fn mode(&self) -> NetworkMode {
187        self.mode
188    }
189
190    pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
191        self.app_event_tx.subscribe()
192    }
193
194    pub fn fingerprint(&self) -> &str {
195        self.identity.fingerprint()
196    }
197
198    pub fn peer_id(&self) -> PeerId {
199        self.identity.peer_id()
200    }
201
202    pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
203        let now = now_unix();
204        let mut by_id: HashMap<String, DiscoveredRoom> = self
205            .discovered_rooms
206            .lock()
207            .unwrap()
208            .clone();
209
210        // Merge in rooms we're currently in — gossipsub doesn't echo our
211        // own announcements back to us, so without this our own hosted
212        // rooms wouldn't appear in the lobby.
213        for room in self.active_rooms.lock().unwrap().values() {
214            let entry = DiscoveredRoom {
215                room_id: room.info.id.clone(),
216                name: room.info.name.clone(),
217                encrypted: room.info.encrypted,
218                member_count: room.members.len() as u32,
219                creator_fingerprint: room.info.creator_fingerprint.clone(),
220                last_seen: now,
221                restorable: false,
222            };
223            by_id
224                .entry(room.info.id.clone())
225                .and_modify(|d| {
226                    d.last_seen = now;
227                    if entry.member_count > d.member_count {
228                        d.member_count = entry.member_count;
229                    }
230                    d.restorable = false;
231                })
232                .or_insert(entry);
233        }
234
235        // Encrypted rooms we have on disk but haven't rejoined this
236        // session. Only surface them when no fresh discovery / active
237        // entry exists for the same room.
238        for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
239            if by_id.contains_key(id) {
240                continue;
241            }
242            by_id.insert(
243                id.clone(),
244                DiscoveredRoom {
245                    room_id: id.clone(),
246                    name: stored.name.clone(),
247                    encrypted: stored.encrypted,
248                    member_count: 0,
249                    creator_fingerprint: stored.creator_fingerprint.clone(),
250                    last_seen: stored.last_active.unwrap_or(stored.created_at),
251                    restorable: true,
252                },
253            );
254        }
255
256        let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
257        v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
258        v
259    }
260
261    pub fn active_room_ids(&self) -> Vec<String> {
262        self.active_rooms.lock().unwrap().keys().cloned().collect()
263    }
264
265    pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
266        self.active_rooms
267            .lock()
268            .unwrap()
269            .get(room_id)
270            .map(|r| r.info.clone())
271    }
272
273    pub fn room_members(&self, room_id: &str) -> Vec<String> {
274        self.active_rooms
275            .lock()
276            .unwrap()
277            .get(room_id)
278            .map(|r| {
279                let mut m: Vec<String> = r.members.iter().cloned().collect();
280                m.sort();
281                m
282            })
283            .unwrap_or_default()
284    }
285
286    pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
287        repo::get_room_messages(&self.db, room_id, limit)
288    }
289
290    pub fn search_room_messages(
291        &self,
292        room_id: &str,
293        query: &str,
294        limit: i64,
295    ) -> Result<Vec<repo::StoredRoomMessage>> {
296        repo::search_room_messages(&self.db, room_id, query, limit)
297    }
298
299    /// Create a new room. Returns its room_id.
300    pub async fn start_room(
301        &self,
302        name: &str,
303        encrypted: bool,
304        passphrase: Option<&str>,
305    ) -> Result<String> {
306        if encrypted && passphrase.is_none() {
307            return Err(HuddleError::Other(
308                "encrypted room requires a passphrase".into(),
309            ));
310        }
311
312        let created_at = now_unix();
313        let creator_fp = self.identity.fingerprint().to_string();
314        let room_id = derive_room_id(&creator_fp, name, created_at);
315
316        let (passphrase_salt, passphrase_key) = if encrypted {
317            let salt = passphrase::random_salt();
318            let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
319            (Some(salt.to_vec()), Some(key))
320        } else {
321            (None, None)
322        };
323
324        let info = StoredRoom {
325            id: room_id.clone(),
326            name: name.to_string(),
327            creator_fingerprint: creator_fp.clone(),
328            encrypted,
329            passphrase_salt: passphrase_salt.clone(),
330            created_at,
331            last_active: Some(created_at),
332        };
333        repo::insert_room(&self.db, &info)?;
334
335        let crypto = if encrypted {
336            Some(RoomCrypto::new_for_room(
337                self.db.clone(),
338                room_id.clone(),
339                creator_fp.clone(),
340            )?)
341        } else {
342            None
343        };
344
345        let mut members = HashSet::new();
346        members.insert(creator_fp.clone());
347
348        self.active_rooms.lock().unwrap().insert(
349            room_id.clone(),
350            ActiveRoom {
351                info: info.clone(),
352                crypto,
353                passphrase_key,
354                members,
355                typers: HashMap::new(),
356            },
357        );
358
359        self.network.subscribe_room(room_id.clone()).await;
360        self.announce_room_now(&info, 1).await;
361
362        // Broadcast our presence in the room (with our wrapped session key
363        // if encrypted). Use a small delay so the subscription propagates.
364        let app = self.clone();
365        let rid = room_id.clone();
366        tokio::spawn(async move {
367            tokio::time::sleep(Duration::from_millis(500)).await;
368            if let Err(e) = app.broadcast_member_announce(&rid).await {
369                warn!(%e, "broadcast member announce");
370            }
371        });
372
373        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
374            room_id: room_id.clone(),
375        });
376
377        Ok(room_id)
378    }
379
380    /// Join an existing room. The room may come from a live announcement
381    /// (preferred), our restorable set, or the DB directly — whichever has
382    /// the freshest copy. For encrypted rooms `passphrase` is required.
383    pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
384        // Resolve room metadata from the freshest available source.
385        let (name, creator_fingerprint, encrypted, salt_opt) = {
386            if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
387                let salt = self.get_room_salt(room_id);
388                (d.name, d.creator_fingerprint, d.encrypted, salt)
389            } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
390            {
391                (
392                    stored.name,
393                    stored.creator_fingerprint,
394                    stored.encrypted,
395                    stored.passphrase_salt,
396                )
397            } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
398                (
399                    stored.name,
400                    stored.creator_fingerprint,
401                    stored.encrypted,
402                    stored.passphrase_salt,
403                )
404            } else {
405                return Err(HuddleError::Other(format!("room {room_id} not found")));
406            }
407        };
408
409        if encrypted && passphrase.is_none() {
410            return Err(HuddleError::Other(
411                "encrypted room requires a passphrase".into(),
412            ));
413        }
414
415        let passphrase_key = if encrypted {
416            let salt = salt_opt
417                .clone()
418                .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
419            Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
420        } else {
421            None
422        };
423
424        let info = StoredRoom {
425            id: room_id.to_string(),
426            name,
427            creator_fingerprint,
428            encrypted,
429            passphrase_salt: salt_opt.clone(),
430            created_at: now_unix(),
431            last_active: Some(now_unix()),
432        };
433        repo::insert_room(&self.db, &info)?;
434
435        let crypto = if encrypted {
436            Some(RoomCrypto::new_for_room(
437                self.db.clone(),
438                room_id.to_string(),
439                self.identity.fingerprint().to_string(),
440            )?)
441        } else {
442            None
443        };
444
445        let mut members = HashSet::new();
446        members.insert(self.identity.fingerprint().to_string());
447
448        self.active_rooms.lock().unwrap().insert(
449            room_id.to_string(),
450            ActiveRoom {
451                info: info.clone(),
452                crypto,
453                passphrase_key,
454                members,
455                typers: HashMap::new(),
456            },
457        );
458        // No longer "restorable" now that we've rejoined.
459        self.restorable_rooms.lock().unwrap().remove(room_id);
460
461        self.network.subscribe_room(room_id.to_string()).await;
462
463        let app = self.clone();
464        let rid = room_id.to_string();
465        tokio::spawn(async move {
466            tokio::time::sleep(Duration::from_millis(500)).await;
467            if let Err(e) = app.broadcast_member_announce(&rid).await {
468                warn!(%e, "broadcast member announce");
469            }
470            // Ask existing members for their session keys.
471            let req = RoomMessage::SessionKeyRequest {
472                requester_fingerprint: app.identity.fingerprint().to_string(),
473            };
474            if let Ok(bytes) = serde_json::to_vec(&req) {
475                app.network.publish_room_message(rid.clone(), bytes).await;
476            }
477        });
478
479        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
480            room_id: room_id.to_string(),
481        });
482
483        Ok(())
484    }
485
486    /// Walk the rooms table at startup. Non-encrypted rooms are silently
487    /// restored (subscribed + re-announced). Encrypted rooms get added to
488    /// `restorable_rooms` so the lobby surfaces them and the user can
489    /// re-enter via the join flow with passphrase.
490    async fn restore_rooms_from_db(&self) {
491        let rooms = match repo::list_rooms(&self.db) {
492            Ok(v) => v,
493            Err(e) => {
494                warn!(%e, "list rooms on restore");
495                return;
496            }
497        };
498        let our_fp = self.identity.fingerprint().to_string();
499        let count = rooms.len();
500        for info in rooms {
501            if info.encrypted {
502                self.restorable_rooms
503                    .lock()
504                    .unwrap()
505                    .insert(info.id.clone(), info);
506                continue;
507            }
508            let mut members = HashSet::new();
509            members.insert(our_fp.clone());
510            if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
511                for m in stored_members {
512                    members.insert(m.fingerprint);
513                }
514            }
515            self.active_rooms.lock().unwrap().insert(
516                info.id.clone(),
517                ActiveRoom {
518                    info: info.clone(),
519                    crypto: None,
520                    passphrase_key: None,
521                    members,
522                    typers: HashMap::new(),
523                },
524            );
525            self.network.subscribe_room(info.id.clone()).await;
526            self.announce_room_now(&info, 1).await;
527            info!(room_id = %info.id, name = %info.name, "restored room");
528        }
529        if count > 0 {
530            debug!(count, "restored rooms from db");
531        }
532    }
533
534    pub async fn leave_room(&self, room_id: &str) -> Result<()> {
535        // Broadcast a leave message before unsubscribing.
536        let leave_msg = RoomMessage::MemberLeave {
537            sender_fingerprint: self.identity.fingerprint().to_string(),
538        };
539        if let Ok(bytes) = serde_json::to_vec(&leave_msg) {
540            self.network
541                .publish_room_message(room_id.to_string(), bytes)
542                .await;
543        }
544
545        self.active_rooms.lock().unwrap().remove(room_id);
546        self.network.unsubscribe_room(room_id.to_string()).await;
547
548        let _ = self.app_event_tx.send(AppEvent::RoomLeft {
549            room_id: room_id.to_string(),
550        });
551        Ok(())
552    }
553
554    pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
555        let our_fp = self.identity.fingerprint().to_string();
556        let msg = {
557            let mut rooms = self.active_rooms.lock().unwrap();
558            let room = rooms
559                .get_mut(room_id)
560                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
561
562            if room.info.encrypted {
563                let crypto = room
564                    .crypto
565                    .as_mut()
566                    .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
567                let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
568                RoomMessage::Encrypted {
569                    sender_fingerprint: our_fp.clone(),
570                    session_id,
571                    ciphertext_b64: base64::Engine::encode(
572                        &base64::engine::general_purpose::STANDARD,
573                        &ct_bytes,
574                    ),
575                }
576            } else {
577                RoomMessage::Plain {
578                    sender_fingerprint: our_fp.clone(),
579                    body: body.to_string(),
580                }
581            }
582        };
583
584        let bytes = serde_json::to_vec(&msg)?;
585        self.network
586            .publish_room_message(room_id.to_string(), bytes)
587            .await;
588
589        let now = now_unix();
590        let msg_id =
591            repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
592        repo::update_room_last_active(&self.db, room_id, now)?;
593
594        let _ = self.app_event_tx.send(AppEvent::MessageSent {
595            room_id: room_id.to_string(),
596            body: body.to_string(),
597            message_id: msg_id,
598        });
599
600        Ok(())
601    }
602
603    pub async fn shutdown(&self) {
604        self.network.shutdown().await;
605    }
606
607    // -------------------------------------------------------------------
608    // Dial / known peers
609    // -------------------------------------------------------------------
610
611    /// Dial a peer by a user-entered address. Accepts:
612    /// - `1.2.3.4:9000`
613    /// - `[fe80::1]:9000`
614    /// - `/ip4/.../tcp/...[/p2p/<peer>]` (raw multiaddr)
615    pub async fn dial(&self, input: &str) -> Result<()> {
616        let multiaddr = parse_dial_address(input)?;
617        let canonical = multiaddr.to_string();
618        info!(%canonical, "dialing");
619
620        repo::upsert_known_peer(
621            &self.db,
622            &KnownPeer {
623                address: canonical.clone(),
624                label: None,
625                last_connected_at: None,
626                last_attempt_at: Some(now_unix()),
627                created_at: now_unix(),
628            },
629        )?;
630
631        let _ = self.app_event_tx.send(AppEvent::Dialing {
632            address: canonical.clone(),
633        });
634        self.network.dial(multiaddr).await;
635        Ok(())
636    }
637
638    pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
639        let connected = self.connected_dial_addrs.lock().unwrap().clone();
640        let stored = repo::list_known_peers(&self.db).unwrap_or_default();
641        stored
642            .into_iter()
643            .map(|p| {
644                let connected_peer = connected.get(&p.address).copied();
645                KnownPeerStatus {
646                    address: p.address,
647                    label: p.label,
648                    last_connected_at: p.last_connected_at,
649                    connected_peer_id: connected_peer,
650                }
651            })
652            .collect()
653    }
654
655    pub async fn forget_peer(&self, address: &str) -> Result<()> {
656        repo::forget_known_peer(&self.db, address)?;
657        self.connected_dial_addrs.lock().unwrap().remove(address);
658        Ok(())
659    }
660
661    /// Re-dial a stored address — used by the lobby's "reconnect" action.
662    pub async fn redial(&self, address: &str) -> Result<()> {
663        self.dial(address).await
664    }
665
666    fn spawn_known_peer_reconnector(&self) {
667        let handle = self.clone();
668        tokio::spawn(async move {
669            // Brief delay so listeners come up first.
670            tokio::time::sleep(Duration::from_millis(500)).await;
671            let known = repo::list_known_peers(&handle.db).unwrap_or_default();
672            for peer in known {
673                if let Err(e) = handle.dial(&peer.address).await {
674                    debug!(%e, addr = %peer.address, "auto-reconnect failed");
675                }
676            }
677        });
678    }
679
680    // -------------------------------------------------------------------
681    // Internal helpers
682    // -------------------------------------------------------------------
683
684    fn load_or_create_identity(db: &Db) -> Result<Identity> {
685        if let Some(stored) = repo::load_identity(db)? {
686            let mut bytes = [0u8; 32];
687            bytes.copy_from_slice(&stored.ed25519_secret);
688            Identity::from_secret_bytes(bytes)
689        } else {
690            let id = Identity::generate()?;
691            repo::save_identity(db, &id.secret_bytes(), now_unix())?;
692            Ok(id)
693        }
694    }
695
696    fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
697        self.active_rooms
698            .lock()
699            .unwrap()
700            .get(room_id)
701            .and_then(|r| r.info.passphrase_salt.clone())
702            .or_else(|| {
703                // Try the cached announcement salt
704                ROOM_SALT_CACHE
705                    .lock()
706                    .unwrap()
707                    .get(room_id)
708                    .cloned()
709            })
710    }
711
712    async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
713        let ann = RoomAnnouncement {
714            room_id: info.id.clone(),
715            name: info.name.clone(),
716            encrypted: info.encrypted,
717            passphrase_salt: info.passphrase_salt.clone(),
718            member_count,
719            creator_fingerprint: info.creator_fingerprint.clone(),
720            announced_at: now_unix(),
721        };
722        self.network.announce_room(ann).await;
723    }
724
725    async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
726        let our_fp = self.identity.fingerprint().to_string();
727        let wrapped = {
728            let mut rooms = self.active_rooms.lock().unwrap();
729            let room = rooms
730                .get_mut(room_id)
731                .ok_or_else(|| HuddleError::Other("not in room".into()))?;
732            if room.info.encrypted {
733                let crypto = room.crypto.as_mut().unwrap();
734                let session_key = crypto.our_session_key_b64();
735                let passphrase_key = room
736                    .passphrase_key
737                    .as_ref()
738                    .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
739                Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
740            } else {
741                None
742            }
743        };
744        let display_name = repo::get_display_name(&self.db).unwrap_or(None);
745        let msg = RoomMessage::MemberAnnounce {
746            sender_fingerprint: our_fp,
747            wrapped_session_key: wrapped,
748            display_name,
749        };
750        let bytes = serde_json::to_vec(&msg)?;
751        self.network
752            .publish_room_message(room_id.to_string(), bytes)
753            .await;
754        Ok(())
755    }
756
757    fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
758        let handle = self.clone();
759        tokio::spawn(async move {
760            while let Some(event) = net_rx.recv().await {
761                handle.process_network_event(event).await;
762            }
763            info!("event processor stopped");
764        });
765    }
766
767    fn spawn_announcement_ticker(&self) {
768        let handle = self.clone();
769        tokio::spawn(async move {
770            let mut interval =
771                tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
772            interval.tick().await; // skip the immediate tick
773            loop {
774                interval.tick().await;
775                let snapshot: Vec<(StoredRoom, u32)> = {
776                    let active = handle.active_rooms.lock().unwrap();
777                    active
778                        .values()
779                        .map(|r| (r.info.clone(), r.members.len() as u32))
780                        .collect()
781                };
782                for (info, member_count) in snapshot {
783                    handle.announce_room_now(&info, member_count).await;
784                }
785            }
786        });
787    }
788
789    fn spawn_discovered_room_pruner(&self) {
790        let handle = self.clone();
791        tokio::spawn(async move {
792            let mut interval = tokio::time::interval(Duration::from_secs(10));
793            interval.tick().await;
794            loop {
795                interval.tick().await;
796                let now = now_unix();
797                let mut to_drop = Vec::new();
798                {
799                    let mut map = handle.discovered_rooms.lock().unwrap();
800                    map.retain(|id, r| {
801                        if now - r.last_seen > DISCOVERED_TTL_SECS {
802                            to_drop.push(id.clone());
803                            false
804                        } else {
805                            true
806                        }
807                    });
808                }
809                for id in to_drop {
810                    let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
811                }
812            }
813        });
814    }
815
816    async fn process_network_event(&self, event: NetworkEvent) {
817        match event {
818            NetworkEvent::PeerDiscovered { peer_id } => {
819                let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
820            }
821            NetworkEvent::PeerExpired { .. } => {}
822            NetworkEvent::ListeningOn { address } => {
823                let _ = self.app_event_tx.send(AppEvent::ListeningOn {
824                    address: address.to_string(),
825                });
826            }
827            NetworkEvent::RoomAnnouncementReceived(ann) => {
828                let our_fp = self.identity.fingerprint();
829                // Cache the salt for join_room
830                if let Some(salt) = &ann.passphrase_salt {
831                    ROOM_SALT_CACHE
832                        .lock()
833                        .unwrap()
834                        .insert(ann.room_id.clone(), salt.clone());
835                }
836                let discovered = DiscoveredRoom {
837                    room_id: ann.room_id.clone(),
838                    name: ann.name.clone(),
839                    encrypted: ann.encrypted,
840                    member_count: ann.member_count,
841                    creator_fingerprint: ann.creator_fingerprint.clone(),
842                    last_seen: now_unix(),
843                    restorable: false,
844                };
845                // Skip our own announcements
846                if ann.creator_fingerprint == our_fp
847                    && self.active_rooms.lock().unwrap().contains_key(&ann.room_id)
848                {
849                    // It's our room — still cache it so others can join, but don't emit.
850                    self.discovered_rooms
851                        .lock()
852                        .unwrap()
853                        .insert(ann.room_id.clone(), discovered);
854                    return;
855                }
856                self.discovered_rooms
857                    .lock()
858                    .unwrap()
859                    .insert(ann.room_id.clone(), discovered.clone());
860                let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
861            }
862            NetworkEvent::RoomMessageReceived {
863                room_id,
864                payload,
865                from_peer: _,
866            } => {
867                let msg: RoomMessage = match serde_json::from_slice(&payload) {
868                    Ok(m) => m,
869                    Err(e) => {
870                        warn!(%e, "bad room message");
871                        return;
872                    }
873                };
874                self.handle_room_message(&room_id, msg).await;
875            }
876            NetworkEvent::DialSucceeded { peer_id, address } => {
877                let addr_s = address.to_string();
878                self.connected_dial_addrs
879                    .lock()
880                    .unwrap()
881                    .insert(addr_s.clone(), peer_id);
882                let _ = repo::upsert_known_peer(
883                    &self.db,
884                    &KnownPeer {
885                        address: addr_s.clone(),
886                        label: None,
887                        last_connected_at: Some(now_unix()),
888                        last_attempt_at: Some(now_unix()),
889                        created_at: now_unix(),
890                    },
891                );
892                let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
893                    address: addr_s,
894                    peer_id,
895                });
896            }
897            NetworkEvent::DialFailed { address, error } => {
898                let addr_s = address.to_string();
899                let _ = self.app_event_tx.send(AppEvent::DialFailed {
900                    address: addr_s,
901                    error,
902                });
903            }
904        }
905    }
906
907    async fn handle_room_message(&self, room_id: &str, msg: RoomMessage) {
908        let our_fp = self.identity.fingerprint().to_string();
909        match msg {
910            RoomMessage::MemberAnnounce {
911                sender_fingerprint,
912                wrapped_session_key,
913                display_name,
914            } => {
915                if sender_fingerprint == our_fp {
916                    return;
917                }
918                let need_inbound = {
919                    let mut rooms = self.active_rooms.lock().unwrap();
920                    let room = match rooms.get_mut(room_id) {
921                        Some(r) => r,
922                        None => return,
923                    };
924                    let newly_added = room.members.insert(sender_fingerprint.clone());
925                    if newly_added {
926                        let _ = self.app_event_tx.send(AppEvent::MemberJoined {
927                            room_id: room_id.to_string(),
928                            fingerprint: sender_fingerprint.clone(),
929                        });
930                    }
931                    // Persist member with optional display name.
932                    let _ = repo::upsert_room_member(
933                        &self.db,
934                        &StoredRoomMember {
935                            room_id: room_id.to_string(),
936                            peer_id: String::new(), // unknown at this layer
937                            fingerprint: sender_fingerprint.clone(),
938                            last_seen: Some(now_unix()),
939                            verified: false,
940                        },
941                    );
942                    if let Some(name) = display_name.as_deref() {
943                        let _ = repo::set_member_display_name(
944                            &self.db,
945                            room_id,
946                            &sender_fingerprint,
947                            Some(name),
948                        );
949                    }
950                    room.info.encrypted && wrapped_session_key.is_some()
951                };
952
953                if need_inbound {
954                    let wrapped = wrapped_session_key.unwrap();
955                    let result = {
956                        let mut rooms = self.active_rooms.lock().unwrap();
957                        let room = rooms.get_mut(room_id).unwrap();
958                        let passphrase_key = match &room.passphrase_key {
959                            Some(k) => k,
960                            None => {
961                                warn!("no passphrase key when receiving session key");
962                                return;
963                            }
964                        };
965                        match passphrase::unwrap(&wrapped, passphrase_key) {
966                            Ok(plain) => match String::from_utf8(plain) {
967                                Ok(key_b64) => {
968                                    let crypto = room.crypto.as_mut().unwrap();
969                                    crypto.add_inbound_session(&sender_fingerprint, &key_b64)
970                                }
971                                Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
972                            },
973                            Err(e) => Err(e),
974                        }
975                    };
976                    if let Err(e) = result {
977                        error!(%e, "add inbound session failed");
978                    }
979                }
980            }
981            RoomMessage::SessionKeyRequest {
982                requester_fingerprint,
983            } => {
984                if requester_fingerprint == our_fp {
985                    return;
986                }
987                // Re-announce ourselves to share our session key with the new joiner.
988                if let Err(e) = self.broadcast_member_announce(room_id).await {
989                    warn!(%e, "broadcast member announce on request");
990                }
991            }
992            RoomMessage::Encrypted {
993                sender_fingerprint,
994                session_id,
995                ciphertext_b64,
996            } => {
997                if sender_fingerprint == our_fp {
998                    return;
999                }
1000                let ct_bytes = match base64::Engine::decode(
1001                    &base64::engine::general_purpose::STANDARD,
1002                    &ciphertext_b64,
1003                ) {
1004                    Ok(b) => b,
1005                    Err(e) => {
1006                        warn!(%e, "bad base64 ciphertext");
1007                        return;
1008                    }
1009                };
1010                let plaintext = {
1011                    let mut rooms = self.active_rooms.lock().unwrap();
1012                    let room = match rooms.get_mut(room_id) {
1013                        Some(r) => r,
1014                        None => return,
1015                    };
1016                    let crypto = match room.crypto.as_mut() {
1017                        Some(c) => c,
1018                        None => return,
1019                    };
1020                    crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1021                };
1022                match plaintext {
1023                    Ok(pt) => {
1024                        let body = String::from_utf8_lossy(&pt).to_string();
1025                        let sent_at = now_unix();
1026                        let _ = repo::insert_room_message(
1027                            &self.db,
1028                            room_id,
1029                            &sender_fingerprint,
1030                            "in",
1031                            &body,
1032                            sent_at,
1033                        );
1034                        let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1035                        self.maybe_emit_mention(room_id, &body);
1036                        let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1037                            room_id: room_id.to_string(),
1038                            sender_fingerprint,
1039                            body,
1040                            sent_at,
1041                        });
1042                    }
1043                    Err(e) => {
1044                        debug!(%e, "decrypt failed (probably missing session key)");
1045                    }
1046                }
1047            }
1048            RoomMessage::Plain {
1049                sender_fingerprint,
1050                body,
1051            } => {
1052                if sender_fingerprint == our_fp {
1053                    return;
1054                }
1055                let sent_at = now_unix();
1056                let _ = repo::insert_room_message(
1057                    &self.db,
1058                    room_id,
1059                    &sender_fingerprint,
1060                    "in",
1061                    &body,
1062                    sent_at,
1063                );
1064                let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1065                self.maybe_emit_mention(room_id, &body);
1066                let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1067                    room_id: room_id.to_string(),
1068                    sender_fingerprint,
1069                    body,
1070                    sent_at,
1071                });
1072            }
1073            RoomMessage::Typing { sender_fingerprint } => {
1074                if sender_fingerprint == our_fp {
1075                    return;
1076                }
1077                let expiry = now_unix() + TYPING_TTL_SECS;
1078                let mut rooms = self.active_rooms.lock().unwrap();
1079                if let Some(room) = rooms.get_mut(room_id) {
1080                    room.typers.insert(sender_fingerprint, expiry);
1081                }
1082                drop(rooms);
1083                let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1084                    room_id: room_id.to_string(),
1085                });
1086            }
1087            RoomMessage::RotateRoomKey {
1088                rotator_fingerprint,
1089                new_salt,
1090            } => {
1091                if rotator_fingerprint == our_fp {
1092                    return;
1093                }
1094                let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1095                    room_id: room_id.to_string(),
1096                    rotator_fingerprint,
1097                    new_salt,
1098                });
1099            }
1100            RoomMessage::MemberLeave { sender_fingerprint } => {
1101                if sender_fingerprint == our_fp {
1102                    return;
1103                }
1104                let removed = {
1105                    let mut rooms = self.active_rooms.lock().unwrap();
1106                    if let Some(room) = rooms.get_mut(room_id) {
1107                        room.members.remove(&sender_fingerprint)
1108                    } else {
1109                        false
1110                    }
1111                };
1112                if removed {
1113                    let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1114                        room_id: room_id.to_string(),
1115                        fingerprint: sender_fingerprint,
1116                    });
1117                }
1118            }
1119            RoomMessage::FileOffer {
1120                sender_fingerprint,
1121                file_id,
1122                name,
1123                size_bytes,
1124                mime,
1125                chunk_count,
1126                encrypted_meta,
1127            } => {
1128                if sender_fingerprint == our_fp {
1129                    return; // ignore our own broadcast
1130                }
1131                self.handle_file_offer(
1132                    room_id,
1133                    sender_fingerprint,
1134                    file_id,
1135                    name,
1136                    size_bytes,
1137                    mime,
1138                    chunk_count,
1139                    encrypted_meta,
1140                );
1141            }
1142            RoomMessage::FileChunk {
1143                sender_fingerprint,
1144                file_id,
1145                chunk_index,
1146                total_chunks,
1147                data_b64,
1148            } => {
1149                if sender_fingerprint == our_fp {
1150                    return;
1151                }
1152                self.handle_file_chunk(
1153                    room_id,
1154                    sender_fingerprint,
1155                    file_id,
1156                    chunk_index,
1157                    total_chunks,
1158                    data_b64,
1159                );
1160            }
1161        }
1162    }
1163
1164    // -------------------------------------------------------------------
1165    // File transfer — public API
1166    // -------------------------------------------------------------------
1167
1168    /// Send a local file to a room. Reads the file, optionally encrypts
1169    /// it for encrypted rooms, chunks it, broadcasts a FileOffer then
1170    /// each FileChunk. Returns the file_id once all chunks are queued.
1171    pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
1172        let bytes = std::fs::read(path)?;
1173        let name = path
1174            .file_name()
1175            .map(|n| n.to_string_lossy().to_string())
1176            .unwrap_or_else(|| "untitled".into());
1177        let mime = crate::files::guess_mime(&name);
1178        let original_path = path.to_path_buf();
1179
1180        let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
1181            let mut rooms = self.active_rooms.lock().unwrap();
1182            let room = rooms
1183                .get_mut(room_id)
1184                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1185            if room.info.encrypted {
1186                let crypto = room
1187                    .crypto
1188                    .as_mut()
1189                    .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1190                let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
1191                (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
1192            } else {
1193                (false, None, None, bytes)
1194            }
1195        };
1196        let _ = &mut maybe_session_id; // silence unused warning when non-encrypted
1197
1198        let plan =
1199            self.file_manager
1200                .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
1201        let file_id = plan.file_id.clone();
1202        let total = plan.chunks.len() as u32;
1203        let our_fp = self.identity.fingerprint().to_string();
1204
1205        let attachment = StoredAttachment {
1206            id: 0,
1207            room_id: room_id.to_string(),
1208            message_id: None,
1209            sender_fingerprint: our_fp.clone(),
1210            file_id: file_id.clone(),
1211            name: name.clone(),
1212            mime: mime.clone(),
1213            size_bytes: plan.size_bytes as i64,
1214            status: AttachmentStatus::Ready,
1215            cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
1216            saved_path: Some(original_path.to_string_lossy().into()),
1217            error: None,
1218            encrypted: room_encrypted,
1219            wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
1220            nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
1221            megolm_session_id: encrypted_meta_opt
1222                .as_ref()
1223                .map(|m| m.megolm_session_id.clone()),
1224            created_at: now_unix(),
1225        };
1226        repo::upsert_attachment(&self.db, &attachment)?;
1227        let _ = self.app_event_tx.send(AppEvent::FileOffered {
1228            room_id: room_id.to_string(),
1229            file_id: file_id.clone(),
1230            name: name.clone(),
1231            size_bytes: plan.size_bytes,
1232            sender_fingerprint: our_fp.clone(),
1233        });
1234
1235        // Publish the offer.
1236        let offer = RoomMessage::FileOffer {
1237            sender_fingerprint: our_fp.clone(),
1238            file_id: file_id.clone(),
1239            name,
1240            size_bytes: plan.size_bytes,
1241            mime,
1242            chunk_count: total,
1243            encrypted_meta: encrypted_meta_opt,
1244        };
1245        if let Ok(bytes) = serde_json::to_vec(&offer) {
1246            self.network
1247                .publish_room_message(room_id.to_string(), bytes)
1248                .await;
1249        }
1250
1251        // Stream chunks. Brief pacing so gossipsub doesn't see a thundering
1252        // herd from a single peer.
1253        let net = self.network.clone();
1254        let room = room_id.to_string();
1255        let our = our_fp.clone();
1256        let fid = file_id.clone();
1257        let chunks = plan.chunks.clone();
1258        tokio::spawn(async move {
1259            for (i, data) in chunks.iter().enumerate() {
1260                let msg = RoomMessage::FileChunk {
1261                    sender_fingerprint: our.clone(),
1262                    file_id: fid.clone(),
1263                    chunk_index: i as u32,
1264                    total_chunks: total,
1265                    data_b64: B64.encode(data),
1266                };
1267                if let Ok(bytes) = serde_json::to_vec(&msg) {
1268                    net.publish_room_message(room.clone(), bytes).await;
1269                }
1270                tokio::time::sleep(Duration::from_millis(40)).await;
1271            }
1272        });
1273
1274        Ok(file_id)
1275    }
1276
1277    /// Save a completed/ready attachment to the user's Downloads folder.
1278    /// Decrypts encrypted attachments on the way out.
1279    pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
1280        let attachment = repo::get_attachment(&self.db, room_id, file_id)?
1281            .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
1282        if !matches!(
1283            attachment.status,
1284            AttachmentStatus::Ready | AttachmentStatus::Saved
1285        ) {
1286            return Err(HuddleError::Other(format!(
1287                "attachment is not ready (status={})",
1288                attachment.status.as_str()
1289            )));
1290        }
1291        let cached = self.file_manager.read_cache(file_id)?;
1292        let plaintext = if attachment.encrypted {
1293            let meta = EncryptedFileMeta {
1294                megolm_session_id: attachment
1295                    .megolm_session_id
1296                    .clone()
1297                    .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
1298                wrapped_key_b64: attachment
1299                    .wrapped_key
1300                    .clone()
1301                    .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
1302                nonce_b64: attachment
1303                    .nonce
1304                    .clone()
1305                    .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
1306            };
1307            // For our own sent files we don't have an inbound session
1308            // keyed by ourselves; the file_manager cache for the sender
1309            // already holds the ciphertext. To open our own attachment
1310            // we just open the original saved_path via open_saved (which
1311            // doesn't decrypt — the bytes already exist on disk).
1312            if attachment.sender_fingerprint == self.identity.fingerprint() {
1313                return Err(HuddleError::Other(
1314                    "this attachment is your own — use [o] open to open the source file".into(),
1315                ));
1316            }
1317            self.decrypt_attachment(room_id, &attachment.sender_fingerprint, &cached, &meta)?
1318        } else {
1319            cached
1320        };
1321        let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
1322        repo::update_attachment_paths(
1323            &self.db,
1324            room_id,
1325            file_id,
1326            None,
1327            Some(&saved.to_string_lossy()),
1328        )?;
1329        repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
1330        let _ = self.app_event_tx.send(AppEvent::FileSaved {
1331            file_id: file_id.into(),
1332            path: saved.to_string_lossy().into(),
1333        });
1334        Ok(saved)
1335    }
1336
1337    /// Drop any in-flight chunks and remove the attachment row.
1338    pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
1339        self.file_manager.cancel_incoming(file_id);
1340        repo::update_attachment_status(
1341            &self.db,
1342            room_id,
1343            file_id,
1344            AttachmentStatus::Cancelled,
1345            None,
1346        )?;
1347        Ok(())
1348    }
1349
1350    /// Launch the system's default opener on a saved file.
1351    pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
1352        let attachment = repo::get_attachment(&self.db, room_id, file_id)?
1353            .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
1354        let path = attachment
1355            .saved_path
1356            .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
1357        open_with_system(&path)
1358    }
1359
1360    pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
1361        repo::list_room_attachments(&self.db, room_id)
1362    }
1363
1364    /// Mark a peer's fingerprint as verified in the given room. Used by
1365    /// the `^V` verification modal after the user has compared the
1366    /// fingerprint out-of-band.
1367    pub fn set_member_verified(
1368        &self,
1369        room_id: &str,
1370        fingerprint: &str,
1371        verified: bool,
1372    ) -> Result<()> {
1373        // Make sure there's a member row to flip — peer_id is unknown
1374        // at this layer when the user verifies an out-of-band identity,
1375        // so we use the fingerprint as the canonical identity key with
1376        // an empty peer_id placeholder if none exists.
1377        let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
1378        if !members.iter().any(|m| m.fingerprint == fingerprint) {
1379            repo::upsert_room_member(
1380                &self.db,
1381                &StoredRoomMember {
1382                    room_id: room_id.to_string(),
1383                    peer_id: String::new(),
1384                    fingerprint: fingerprint.to_string(),
1385                    last_seen: Some(now_unix()),
1386                    verified,
1387                },
1388            )?;
1389        }
1390        repo::set_member_verified(&self.db, room_id, fingerprint, verified)
1391    }
1392
1393    pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
1394        repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
1395    }
1396
1397    pub fn display_name(&self) -> Option<String> {
1398        repo::get_display_name(&self.db).unwrap_or(None)
1399    }
1400
1401    pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
1402        repo::set_display_name(&self.db, name)
1403    }
1404
1405    /// Look up the display name we've seen for a peer in any room.
1406    pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
1407        repo::lookup_display_name(&self.db, fingerprint).unwrap_or(None)
1408    }
1409
1410    pub fn is_room_muted(&self, room_id: &str) -> bool {
1411        repo::is_room_muted(&self.db, room_id).unwrap_or(false)
1412    }
1413
1414    pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
1415        repo::set_room_muted(&self.db, room_id, muted)
1416    }
1417
1418    /// Broadcast a "I'm typing" pulse to the given room. Caller is
1419    /// responsible for debouncing (don't fire more than every ~500ms).
1420    pub async fn broadcast_typing(&self, room_id: &str) {
1421        if !self.active_rooms.lock().unwrap().contains_key(room_id) {
1422            return;
1423        }
1424        let msg = RoomMessage::Typing {
1425            sender_fingerprint: self.identity.fingerprint().to_string(),
1426        };
1427        if let Ok(bytes) = serde_json::to_vec(&msg) {
1428            self.network
1429                .publish_room_message(room_id.to_string(), bytes)
1430                .await;
1431        }
1432    }
1433
1434    /// Returns the fingerprints of peers currently typing in `room_id`,
1435    /// pruning entries past their TTL.
1436    pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
1437        let now = now_unix();
1438        let mut rooms = self.active_rooms.lock().unwrap();
1439        let room = match rooms.get_mut(room_id) {
1440            Some(r) => r,
1441            None => return Vec::new(),
1442        };
1443        room.typers.retain(|_, exp| *exp > now);
1444        let mut v: Vec<String> = room.typers.keys().cloned().collect();
1445        v.sort();
1446        v
1447    }
1448
1449    // -------------------------------------------------------------------
1450    // Room key rotation
1451    // -------------------------------------------------------------------
1452
1453    /// Rotate this room's outbound Megolm session under a fresh
1454    /// passphrase. Broadcasts `RotateRoomKey` (so other members know to
1455    /// expect a new passphrase) and a fresh `MemberAnnounce` with the
1456    /// new wrapped session key. Old inbound sessions stay in storage
1457    /// for decrypting historic messages.
1458    pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
1459        if new_passphrase.is_empty() {
1460            return Err(HuddleError::Other("new passphrase is empty".into()));
1461        }
1462        let new_salt = passphrase::random_salt();
1463        let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
1464
1465        let info = {
1466            let mut rooms = self.active_rooms.lock().unwrap();
1467            let room = rooms
1468                .get_mut(room_id)
1469                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1470            if !room.info.encrypted {
1471                return Err(HuddleError::Other(
1472                    "rotation only applies to encrypted rooms".into(),
1473                ));
1474            }
1475            // Generate a fresh outbound Megolm session for this member.
1476            let new_crypto = RoomCrypto::new_for_room(
1477                self.db.clone(),
1478                room_id.to_string(),
1479                self.identity.fingerprint().to_string(),
1480            )?;
1481            room.crypto = Some(new_crypto);
1482            room.passphrase_key = Some(new_key);
1483            room.info.passphrase_salt = Some(new_salt.to_vec());
1484            room.info.clone()
1485        };
1486
1487        // Persist the new salt on the stored row.
1488        repo::insert_room(&self.db, &info)?;
1489
1490        // Broadcast the rotation signal.
1491        let rot = RoomMessage::RotateRoomKey {
1492            rotator_fingerprint: self.identity.fingerprint().to_string(),
1493            new_salt: new_salt.to_vec(),
1494        };
1495        if let Ok(bytes) = serde_json::to_vec(&rot) {
1496            self.network
1497                .publish_room_message(room_id.to_string(), bytes)
1498                .await;
1499        }
1500
1501        // Re-announce ourselves with the new wrapped session key.
1502        if let Err(e) = self.broadcast_member_announce(room_id).await {
1503            warn!(%e, "rotate: broadcast announce failed");
1504        }
1505        Ok(())
1506    }
1507
1508    /// Used by the TUI when another member rotates a room we're in.
1509    /// Derives the new key, updates our local state, and re-announces
1510    /// so the rotator can share their fresh outbound session with us.
1511    pub async fn accept_rotation(
1512        &self,
1513        room_id: &str,
1514        new_salt: &[u8],
1515        new_passphrase: &str,
1516    ) -> Result<()> {
1517        let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
1518        let info = {
1519            let mut rooms = self.active_rooms.lock().unwrap();
1520            let room = rooms
1521                .get_mut(room_id)
1522                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1523            room.passphrase_key = Some(new_key);
1524            room.info.passphrase_salt = Some(new_salt.to_vec());
1525            room.info.clone()
1526        };
1527        repo::insert_room(&self.db, &info)?;
1528        // Ask the rotator (and anyone) to re-share their session key.
1529        let req = RoomMessage::SessionKeyRequest {
1530            requester_fingerprint: self.identity.fingerprint().to_string(),
1531        };
1532        if let Ok(bytes) = serde_json::to_vec(&req) {
1533            self.network
1534                .publish_room_message(room_id.to_string(), bytes)
1535                .await;
1536        }
1537        Ok(())
1538    }
1539
1540    // -------------------------------------------------------------------
1541    // File transfer — internal handlers
1542    // -------------------------------------------------------------------
1543
1544    #[allow(clippy::too_many_arguments)]
1545    fn handle_file_offer(
1546        &self,
1547        room_id: &str,
1548        sender_fingerprint: String,
1549        file_id: String,
1550        name: String,
1551        size_bytes: u64,
1552        mime: Option<String>,
1553        _chunk_count: u32,
1554        encrypted_meta: Option<EncryptedFileMeta>,
1555    ) {
1556        let encrypted = encrypted_meta.is_some();
1557        let attachment = StoredAttachment {
1558            id: 0,
1559            room_id: room_id.to_string(),
1560            message_id: None,
1561            sender_fingerprint: sender_fingerprint.clone(),
1562            file_id: file_id.clone(),
1563            name: name.clone(),
1564            mime,
1565            size_bytes: size_bytes as i64,
1566            status: AttachmentStatus::Offered,
1567            cache_path: None,
1568            saved_path: None,
1569            error: None,
1570            encrypted,
1571            wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
1572            nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
1573            megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
1574            created_at: now_unix(),
1575        };
1576        if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
1577            warn!(%e, "upsert attachment");
1578            return;
1579        }
1580        let _ = self.app_event_tx.send(AppEvent::FileOffered {
1581            room_id: room_id.to_string(),
1582            file_id,
1583            name,
1584            size_bytes,
1585            sender_fingerprint,
1586        });
1587    }
1588
1589    fn handle_file_chunk(
1590        &self,
1591        room_id: &str,
1592        _sender_fingerprint: String,
1593        file_id: String,
1594        chunk_index: u32,
1595        total_chunks: u32,
1596        data_b64: String,
1597    ) {
1598        let data = match B64.decode(&data_b64) {
1599            Ok(d) => d,
1600            Err(e) => {
1601                warn!(%e, "bad chunk base64");
1602                return;
1603            }
1604        };
1605        // Pull the announced size from our stored offer.
1606        let expected_size = repo::get_attachment(&self.db, room_id, &file_id)
1607            .ok()
1608            .flatten()
1609            .map(|a| a.size_bytes as u64)
1610            .unwrap_or(crate::files::MAX_FILE_SIZE);
1611
1612        let result = self.file_manager.accept_chunk(
1613            &file_id,
1614            chunk_index,
1615            total_chunks,
1616            data,
1617            expected_size,
1618        );
1619        match result {
1620            Ok(None) => {
1621                // Move offered → downloading on first chunk.
1622                let _ = repo::update_attachment_status(
1623                    &self.db,
1624                    room_id,
1625                    &file_id,
1626                    AttachmentStatus::Downloading,
1627                    None,
1628                );
1629                // Best-effort progress event — we know we've processed
1630                // (chunk_index+1)/total_chunks chunks.
1631                let bytes_so_far = self
1632                    .file_manager
1633                    .progress(&file_id)
1634                    .map(|(b, _)| b)
1635                    .unwrap_or(0);
1636                let _ = self.app_event_tx.send(AppEvent::FileProgress {
1637                    file_id: file_id.clone(),
1638                    bytes_received: bytes_so_far,
1639                    total_bytes: expected_size,
1640                });
1641            }
1642            Ok(Some(completed)) => {
1643                let _ = repo::update_attachment_paths(
1644                    &self.db,
1645                    room_id,
1646                    &file_id,
1647                    Some(&completed.cache_path.to_string_lossy()),
1648                    None,
1649                );
1650                let _ = repo::update_attachment_status(
1651                    &self.db,
1652                    room_id,
1653                    &file_id,
1654                    AttachmentStatus::Ready,
1655                    None,
1656                );
1657                let _ = self.app_event_tx.send(AppEvent::FileReady {
1658                    file_id: file_id.clone(),
1659                });
1660            }
1661            Err(e) => {
1662                let msg = e.to_string();
1663                warn!(%msg, "chunk processing failed");
1664                let _ = repo::update_attachment_status(
1665                    &self.db,
1666                    room_id,
1667                    &file_id,
1668                    AttachmentStatus::Failed,
1669                    Some(&msg),
1670                );
1671                let _ = self.app_event_tx.send(AppEvent::FileFailed {
1672                    file_id: file_id.clone(),
1673                    reason: msg,
1674                });
1675            }
1676        }
1677    }
1678
1679    /// Emit MentionReceived if `body` contains either our full
1680    /// fingerprint or its short form (first hex group).
1681    fn maybe_emit_mention(&self, room_id: &str, body: &str) {
1682        let full = self.identity.fingerprint();
1683        let short = full.split('-').next().unwrap_or(full);
1684        let lower = body.to_lowercase();
1685        if lower.contains(&full.to_lowercase()) || lower.contains(&short.to_lowercase()) {
1686            let _ = self.app_event_tx.send(AppEvent::MentionReceived {
1687                room_id: room_id.to_string(),
1688                body: body.to_string(),
1689            });
1690        }
1691    }
1692
1693    fn decrypt_attachment(
1694        &self,
1695        room_id: &str,
1696        sender_fingerprint: &str,
1697        ciphertext: &[u8],
1698        meta: &EncryptedFileMeta,
1699    ) -> Result<Vec<u8>> {
1700        let mut rooms = self.active_rooms.lock().unwrap();
1701        let room = rooms
1702            .get_mut(room_id)
1703            .ok_or_else(|| HuddleError::Other("not in room".into()))?;
1704        let crypto = room
1705            .crypto
1706            .as_mut()
1707            .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1708        file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
1709    }
1710}
1711
1712/// Use the platform's default opener on `path`.
1713fn open_with_system(path: &str) -> Result<()> {
1714    #[cfg(target_os = "macos")]
1715    let cmd = "open";
1716    #[cfg(target_os = "linux")]
1717    let cmd = "xdg-open";
1718    #[cfg(target_os = "windows")]
1719    let cmd = "cmd";
1720    #[cfg(target_os = "windows")]
1721    let args = vec!["/C", "start", "", path];
1722    #[cfg(not(target_os = "windows"))]
1723    let args = vec![path];
1724
1725    std::process::Command::new(cmd)
1726        .args(args)
1727        .spawn()
1728        .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
1729    Ok(())
1730}
1731
1732// Module-level salt cache: room_id -> salt. Populated when we receive
1733// announcements; queried by join_room.
1734static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
1735    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
1736
1737#[allow(dead_code)]
1738fn salt_len() -> usize {
1739    SALT_LEN
1740}
1741
1742fn now_unix() -> i64 {
1743    SystemTime::now()
1744        .duration_since(UNIX_EPOCH)
1745        .unwrap()
1746        .as_secs() as i64
1747}
1748
1749#[cfg(test)]
1750mod parser_tests {
1751    use super::parse_dial_address;
1752
1753    #[test]
1754    fn parses_ipv4_port() {
1755        let m = parse_dial_address("10.3.72.53:9027").unwrap();
1756        assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
1757    }
1758
1759    #[test]
1760    fn parses_bracketed_ipv6() {
1761        let m = parse_dial_address("[::1]:9027").unwrap();
1762        assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
1763    }
1764
1765    #[test]
1766    fn rejects_unbracketed_ipv6() {
1767        let err = parse_dial_address("fe80::1:9027").unwrap_err();
1768        assert!(err.to_string().contains("brackets"));
1769    }
1770
1771    #[test]
1772    fn passes_through_raw_multiaddr() {
1773        let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
1774        assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
1775    }
1776
1777    #[test]
1778    fn empty_address_is_error() {
1779        assert!(parse_dial_address("   ").is_err());
1780    }
1781
1782    #[test]
1783    fn rejects_bad_port() {
1784        assert!(parse_dial_address("1.2.3.4:notaport").is_err());
1785    }
1786}