Skip to main content

huddle_core/app/
mod.rs

1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{encode_wire, RoomAnnouncement, RoomMessage, WireMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25    self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26    StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32/// Lobby-facing view of a known dial peer: persisted address plus
33/// runtime "is the connection currently up?" status.
34#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36    pub address: String,
37    pub label: Option<String>,
38    pub last_connected_at: Option<i64>,
39    pub connected_peer_id: Option<PeerId>,
40}
41
42/// Parse a user-entered dial address into a libp2p `Multiaddr`.
43/// Accepts `ip:port`, `[ipv6]:port`, or a raw multiaddr starting with `/`.
44pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45    let trimmed = input.trim();
46    if trimmed.is_empty() {
47        return Err(HuddleError::Other("address is empty".into()));
48    }
49    if trimmed.starts_with('/') {
50        return trimmed
51            .parse::<Multiaddr>()
52            .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53    }
54    if let Some(rest) = trimmed.strip_prefix('[') {
55        let (host, port) = rest
56            .split_once("]:")
57            .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58        let port: u16 = port
59            .parse()
60            .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61        return format!("/ip6/{}/tcp/{}", host, port)
62            .parse::<Multiaddr>()
63            .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64    }
65    let (host, port) = trimmed
66        .rsplit_once(':')
67        .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68    if host.contains(':') {
69        return Err(HuddleError::Other(format!(
70            "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71        )));
72    }
73    let port: u16 = port
74        .parse()
75        .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76    format!("/ip4/{}/tcp/{}", host, port)
77        .parse::<Multiaddr>()
78        .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81/// State for a room we've created or joined this session.
82struct ActiveRoom {
83    info: StoredRoom,
84    crypto: Option<RoomCrypto>,
85    /// Argon2id-derived 32-byte key for unwrapping incoming session keys.
86    /// None for unencrypted rooms.
87    passphrase_key: Option<[u8; KEY_LEN]>,
88    /// Fingerprints of members currently known to be in the room.
89    members: HashSet<String>,
90    /// Ephemeral typing indicators: fingerprint → unix expiry. Pruned
91    /// on read; never persisted.
92    typers: HashMap<String, i64>,
93    /// Phase F: we joined via a short-lived code rather than the
94    /// passphrase. We have other members' session keys (delivered via
95    /// the CodeJoinResponse ECDH handshake) so we can decrypt; but
96    /// without the passphrase we can't wrap our own outbound session
97    /// key for other members. Read-only until an owner re-onboards us
98    /// with the full passphrase. Defaults false for passphrase joins.
99    #[allow(dead_code)]
100    read_only: bool,
101    /// Phase F: owner-issued join codes for this room (owner side
102    /// only). Pairs of (code, expires_at_unix). Single-use; entries
103    /// removed after a successful CodeJoinResponse goes out.
104    issued_codes: Vec<(String, i64)>,
105}
106
107const TYPING_TTL_SECS: i64 = 3;
108
109/// TTL for a discovered room before it's considered stale (re-announcements
110/// happen every 15 seconds; after 45s of silence we drop it).
111const DISCOVERED_TTL_SECS: i64 = 45;
112const ANNOUNCE_INTERVAL_SECS: u64 = 15;
113
114/// Phase G: in-flight SAS verification state, keyed by tx_id. Held in
115/// memory only; survives just long enough for the two-message
116/// handshake + the user pressing Match on both sides.
117struct SasFlow {
118    room_id: String,
119    partner_fingerprint: String,
120    our_secret: x25519_dalek::StaticSecret,
121    /// Set once we know both sides' pubkeys → the derived SAS code.
122    sas_code: Option<crate::crypto::sas::SasCode>,
123    our_confirmed: bool,
124    their_confirmed: bool,
125}
126
127#[derive(Clone)]
128pub struct AppHandle {
129    identity: Arc<Identity>,
130    network: NetworkHandle,
131    mode: NetworkMode,
132    active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
133    discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
134    /// Encrypted rooms loaded from storage that we haven't rejoined yet
135    /// in this session (their passphrase-derived key isn't in memory).
136    /// Surfaced in the lobby so the user can re-enter with passphrase.
137    restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
138    /// Peer addresses we've dialed in this process; tracks "is the
139    /// connection currently up" for known peers shown in the lobby.
140    connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
141    /// File chunking + cache + downloads.
142    file_manager: Arc<FileManager>,
143    db: Db,
144    /// 32-byte key Megolm session pickles are encrypted under at rest —
145    /// an HKDF subkey of the master key, or all-zero on the
146    /// `--no-master-passphrase` / unencrypted-DB path.
147    session_persist_key: [u8; 32],
148    /// Phase G: active SAS verifications. Keyed by tx_id (the random
149    /// 16-byte salt picked by the initiator + base64'd).
150    sas_flows: Arc<Mutex<HashMap<String, SasFlow>>>,
151    /// Phase F: ephemeral X25519 secrets the joiner is holding while
152    /// they wait for the owner's `CodeJoinResponse`. Keyed by room_id
153    /// — we only have one in-flight code join per room at a time.
154    pending_code_secrets: Arc<Mutex<HashMap<String, x25519_dalek::StaticSecret>>>,
155    app_event_tx: broadcast::Sender<AppEvent>,
156}
157
158impl AppHandle {
159    pub async fn start() -> Result<Self> {
160        Self::start_with_options(NetworkMode::Mdns, 0, None, Vec::new()).await
161    }
162
163    pub async fn start_with_options(
164        mode: NetworkMode,
165        port: u16,
166        master_key: Option<&[u8; 32]>,
167        relays: Vec<Multiaddr>,
168    ) -> Result<Self> {
169        config::ensure_data_dir()?;
170        // Megolm session state is encrypted at rest with an HKDF subkey
171        // of the master key. With no master key (--no-master-passphrase /
172        // tests) it's persisted under the all-zero key, matching the
173        // unencrypted-DB story.
174        let session_persist_key = match master_key {
175            Some(mk) => storage::keychain::derive_subkey(mk, b"megolm-persist"),
176            None => [0u8; 32],
177        };
178        let db = storage::open_db(&config::db_path(), master_key)?;
179        Self::start_with_db_and_options(db, mode, port, session_persist_key, relays).await
180    }
181
182    pub async fn start_with_db(db: Db) -> Result<Self> {
183        Self::start_with_db_and_options(db, NetworkMode::Mdns, 0, [0u8; 32], Vec::new()).await
184    }
185
186    pub async fn start_with_db_and_options(
187        db: Db,
188        mode: NetworkMode,
189        port: u16,
190        session_persist_key: [u8; 32],
191        relays: Vec<Multiaddr>,
192    ) -> Result<Self> {
193        let identity = Self::load_or_create_identity(&db)?;
194        let identity = Arc::new(identity);
195        info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, relay_count = relays.len(), "identity loaded");
196
197        let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
198        let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
199        let network =
200            network::start_network_with(&identity, net_event_tx, mode, port, relays)?;
201
202        let active_rooms = Arc::new(Mutex::new(HashMap::new()));
203        let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
204        let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
205        let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
206        let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
207
208        let handle = Self {
209            identity,
210            network,
211            mode,
212            active_rooms,
213            discovered_rooms,
214            restorable_rooms,
215            connected_dial_addrs,
216            file_manager,
217            db,
218            session_persist_key,
219            sas_flows: Arc::new(Mutex::new(HashMap::new())),
220            pending_code_secrets: Arc::new(Mutex::new(HashMap::new())),
221            app_event_tx,
222        };
223
224        handle.spawn_event_processor(net_event_rx);
225        handle.spawn_announcement_ticker();
226        handle.spawn_discovered_room_pruner();
227        handle.spawn_known_peer_reconnector();
228        handle.restore_rooms_from_db().await;
229
230        Ok(handle)
231    }
232
233    pub fn mode(&self) -> NetworkMode {
234        self.mode
235    }
236
237    pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
238        self.app_event_tx.subscribe()
239    }
240
241    pub fn fingerprint(&self) -> &str {
242        self.identity.fingerprint()
243    }
244
245    pub fn peer_id(&self) -> PeerId {
246        self.identity.peer_id()
247    }
248
249    pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
250        let now = now_unix();
251        let mut by_id: HashMap<String, DiscoveredRoom> = self
252            .discovered_rooms
253            .lock()
254            .unwrap()
255            .clone();
256
257        // Merge in rooms we're currently in — gossipsub doesn't echo our
258        // own announcements back to us, so without this our own hosted
259        // rooms wouldn't appear in the lobby.
260        for room in self.active_rooms.lock().unwrap().values() {
261            let entry = DiscoveredRoom {
262                room_id: room.info.id.clone(),
263                name: room.info.name.clone(),
264                encrypted: room.info.encrypted,
265                member_count: room.members.len() as u32,
266                creator_fingerprint: room.info.creator_fingerprint.clone(),
267                last_seen: now,
268                restorable: false,
269            };
270            by_id
271                .entry(room.info.id.clone())
272                .and_modify(|d| {
273                    d.last_seen = now;
274                    if entry.member_count > d.member_count {
275                        d.member_count = entry.member_count;
276                    }
277                    d.restorable = false;
278                })
279                .or_insert(entry);
280        }
281
282        // Encrypted rooms we have on disk but haven't rejoined this
283        // session. Only surface them when no fresh discovery / active
284        // entry exists for the same room.
285        for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
286            if by_id.contains_key(id) {
287                continue;
288            }
289            by_id.insert(
290                id.clone(),
291                DiscoveredRoom {
292                    room_id: id.clone(),
293                    name: stored.name.clone(),
294                    encrypted: stored.encrypted,
295                    member_count: 0,
296                    creator_fingerprint: stored.creator_fingerprint.clone(),
297                    last_seen: stored.last_active.unwrap_or(stored.created_at),
298                    restorable: true,
299                },
300            );
301        }
302
303        let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
304        v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
305        v
306    }
307
308    pub fn active_room_ids(&self) -> Vec<String> {
309        self.active_rooms.lock().unwrap().keys().cloned().collect()
310    }
311
312    pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
313        self.active_rooms
314            .lock()
315            .unwrap()
316            .get(room_id)
317            .map(|r| r.info.clone())
318    }
319
320    pub fn room_members(&self, room_id: &str) -> Vec<String> {
321        self.active_rooms
322            .lock()
323            .unwrap()
324            .get(room_id)
325            .map(|r| {
326                let mut m: Vec<String> = r.members.iter().cloned().collect();
327                m.sort();
328                m
329            })
330            .unwrap_or_default()
331    }
332
333    pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
334        repo::get_room_messages(&self.db, room_id, limit)
335    }
336
337    pub fn search_room_messages(
338        &self,
339        room_id: &str,
340        query: &str,
341        limit: i64,
342    ) -> Result<Vec<repo::StoredRoomMessage>> {
343        repo::search_room_messages(&self.db, room_id, query, limit)
344    }
345
346    /// Create a new room. Returns its room_id.
347    pub async fn start_room(
348        &self,
349        name: &str,
350        encrypted: bool,
351        passphrase: Option<&str>,
352    ) -> Result<String> {
353        if encrypted && passphrase.is_none() {
354            return Err(HuddleError::Other(
355                "encrypted room requires a passphrase".into(),
356            ));
357        }
358
359        let created_at = now_unix();
360        let creator_fp = self.identity.fingerprint().to_string();
361        let room_id = derive_room_id(&creator_fp, name, created_at);
362
363        let (passphrase_salt, passphrase_key) = if encrypted {
364            let salt = passphrase::random_salt();
365            let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
366            (Some(salt.to_vec()), Some(key))
367        } else {
368            (None, None)
369        };
370
371        let info = StoredRoom {
372            id: room_id.clone(),
373            name: name.to_string(),
374            creator_fingerprint: creator_fp.clone(),
375            encrypted,
376            passphrase_salt: passphrase_salt.clone(),
377            created_at,
378            last_active: Some(created_at),
379        };
380        repo::insert_room(&self.db, &info)?;
381
382        let crypto = if encrypted {
383            Some(RoomCrypto::new_for_room(
384                self.db.clone(),
385                room_id.clone(),
386                creator_fp.clone(),
387                self.session_persist_key,
388            )?)
389        } else {
390            None
391        };
392
393        let mut members = HashSet::new();
394        members.insert(creator_fp.clone());
395
396        // Phase B: the room creator is the first owner. Persisted now so
397        // the very first announcement includes our fingerprint in
398        // `owner_fingerprints`, letting joiners know who's authorized.
399        repo::upsert_room_member(
400            &self.db,
401            &StoredRoomMember {
402                room_id: room_id.clone(),
403                peer_id: String::new(),
404                fingerprint: creator_fp.clone(),
405                last_seen: Some(created_at),
406                verified: true, // we trust ourselves
407                ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
408                role: "owner".into(),
409            },
410        )?;
411
412        self.active_rooms.lock().unwrap().insert(
413            room_id.clone(),
414            ActiveRoom {
415                info: info.clone(),
416                crypto,
417                passphrase_key,
418                members,
419                typers: HashMap::new(),
420                read_only: false,
421                issued_codes: Vec::new(),
422            },
423        );
424
425        self.network.subscribe_room(room_id.clone()).await;
426        self.announce_room_now(&info, 1).await;
427
428        // Broadcast our presence in the room (with our wrapped session key
429        // if encrypted). Use a small delay so the subscription propagates.
430        let app = self.clone();
431        let rid = room_id.clone();
432        tokio::spawn(async move {
433            tokio::time::sleep(Duration::from_millis(500)).await;
434            if let Err(e) = app.broadcast_member_announce(&rid).await {
435                warn!(%e, "broadcast member announce");
436            }
437        });
438
439        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
440            room_id: room_id.clone(),
441        });
442
443        Ok(room_id)
444    }
445
446    /// Join an existing room. The room may come from a live announcement
447    /// (preferred), our restorable set, or the DB directly — whichever has
448    /// the freshest copy. For encrypted rooms `passphrase` is required.
449    pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
450        // Resolve room metadata from the freshest available source.
451        let (name, creator_fingerprint, encrypted, salt_opt) = {
452            if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
453                let salt = self.get_room_salt(room_id);
454                (d.name, d.creator_fingerprint, d.encrypted, salt)
455            } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
456            {
457                (
458                    stored.name,
459                    stored.creator_fingerprint,
460                    stored.encrypted,
461                    stored.passphrase_salt,
462                )
463            } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
464                (
465                    stored.name,
466                    stored.creator_fingerprint,
467                    stored.encrypted,
468                    stored.passphrase_salt,
469                )
470            } else {
471                return Err(HuddleError::Other(format!("room {room_id} not found")));
472            }
473        };
474
475        if encrypted && passphrase.is_none() {
476            return Err(HuddleError::Other(
477                "encrypted room requires a passphrase".into(),
478            ));
479        }
480
481        let passphrase_key = if encrypted {
482            let salt = salt_opt
483                .clone()
484                .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
485            Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
486        } else {
487            None
488        };
489
490        let info = StoredRoom {
491            id: room_id.to_string(),
492            name,
493            creator_fingerprint,
494            encrypted,
495            passphrase_salt: salt_opt.clone(),
496            created_at: now_unix(),
497            last_active: Some(now_unix()),
498        };
499        repo::insert_room(&self.db, &info)?;
500
501        let crypto = if encrypted {
502            // Reuse persisted Megolm sessions on re-join; only mint a fresh
503            // outbound session when nothing is stored for this room yet.
504            let our_fp = self.identity.fingerprint().to_string();
505            let existing = RoomCrypto::load(
506                self.db.clone(),
507                room_id.to_string(),
508                our_fp.clone(),
509                self.session_persist_key,
510            )?;
511            Some(match existing {
512                Some(c) => c,
513                None => RoomCrypto::new_for_room(
514                    self.db.clone(),
515                    room_id.to_string(),
516                    our_fp,
517                    self.session_persist_key,
518                )?,
519            })
520        } else {
521            None
522        };
523
524        let mut members = HashSet::new();
525        members.insert(self.identity.fingerprint().to_string());
526
527        self.active_rooms.lock().unwrap().insert(
528            room_id.to_string(),
529            ActiveRoom {
530                info: info.clone(),
531                crypto,
532                passphrase_key,
533                members,
534                typers: HashMap::new(),
535                read_only: false,
536                issued_codes: Vec::new(),
537            },
538        );
539        // No longer "restorable" now that we've rejoined.
540        self.restorable_rooms.lock().unwrap().remove(room_id);
541
542        self.network.subscribe_room(room_id.to_string()).await;
543
544        let app = self.clone();
545        let rid = room_id.to_string();
546        tokio::spawn(async move {
547            tokio::time::sleep(Duration::from_millis(500)).await;
548            if let Err(e) = app.broadcast_member_announce(&rid).await {
549                warn!(%e, "broadcast member announce");
550            }
551            // Ask existing members for their session keys.
552            let req = RoomMessage::SessionKeyRequest {
553                requester_fingerprint: app.identity.fingerprint().to_string(),
554            };
555            if let Ok(bytes) = encode_wire(&req) {
556                app.network.publish_room_message(rid.clone(), bytes).await;
557            }
558        });
559
560        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
561            room_id: room_id.to_string(),
562        });
563
564        Ok(())
565    }
566
567    /// Walk the rooms table at startup. Non-encrypted rooms are silently
568    /// restored (subscribed + re-announced). Encrypted rooms get added to
569    /// `restorable_rooms` so the lobby surfaces them and the user can
570    /// re-enter via the join flow with passphrase.
571    async fn restore_rooms_from_db(&self) {
572        let rooms = match repo::list_rooms(&self.db) {
573            Ok(v) => v,
574            Err(e) => {
575                warn!(%e, "list rooms on restore");
576                return;
577            }
578        };
579        let our_fp = self.identity.fingerprint().to_string();
580        let count = rooms.len();
581        for info in rooms {
582            if info.encrypted {
583                self.restorable_rooms
584                    .lock()
585                    .unwrap()
586                    .insert(info.id.clone(), info);
587                continue;
588            }
589            let mut members = HashSet::new();
590            members.insert(our_fp.clone());
591            if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
592                for m in stored_members {
593                    members.insert(m.fingerprint);
594                }
595            }
596            let member_count = members.len() as u32;
597            self.active_rooms.lock().unwrap().insert(
598                info.id.clone(),
599                ActiveRoom {
600                    info: info.clone(),
601                    crypto: None,
602                    passphrase_key: None,
603                    members,
604                    typers: HashMap::new(),
605                    read_only: false,
606                    issued_codes: Vec::new(),
607                },
608            );
609            self.network.subscribe_room(info.id.clone()).await;
610            self.announce_room_now(&info, member_count).await;
611            info!(room_id = %info.id, name = %info.name, "restored room");
612        }
613        if count > 0 {
614            debug!(count, "restored rooms from db");
615        }
616    }
617
618    /// Leave a room. Returns `true` when the `MemberLeave` notice was
619    /// handed to the network layer, `false` when it couldn't be encoded
620    /// (peers then only notice via the discovered-room TTL). The local
621    /// leave always succeeds regardless.
622    pub async fn leave_room(&self, room_id: &str) -> Result<bool> {
623        // Broadcast a leave notice before unsubscribing.
624        let leave_msg = RoomMessage::MemberLeave {
625            sender_fingerprint: self.identity.fingerprint().to_string(),
626        };
627        let dispatched = match encode_wire(&leave_msg) {
628            Ok(bytes) => {
629                self.network
630                    .publish_room_message(room_id.to_string(), bytes)
631                    .await;
632                true
633            }
634            Err(e) => {
635                warn!(%e, %room_id, "failed to encode MemberLeave notice");
636                false
637            }
638        };
639
640        self.active_rooms.lock().unwrap().remove(room_id);
641        self.network.unsubscribe_room(room_id.to_string()).await;
642
643        let _ = self.app_event_tx.send(AppEvent::RoomLeft {
644            room_id: room_id.to_string(),
645        });
646        Ok(dispatched)
647    }
648
649    pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
650        let our_fp = self.identity.fingerprint().to_string();
651        let msg = {
652            let mut rooms = self.active_rooms.lock().unwrap();
653            let room = rooms
654                .get_mut(room_id)
655                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
656
657            if room.read_only {
658                return Err(HuddleError::Other(
659                    "this room is read-only — you joined via code without the passphrase. Ask an owner for the passphrase or wait for a key rotation that includes you.".into(),
660                ));
661            }
662
663            if room.info.encrypted {
664                let crypto = room
665                    .crypto
666                    .as_mut()
667                    .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
668                let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
669                RoomMessage::Encrypted {
670                    sender_fingerprint: our_fp.clone(),
671                    session_id,
672                    ciphertext_b64: base64::Engine::encode(
673                        &base64::engine::general_purpose::STANDARD,
674                        &ct_bytes,
675                    ),
676                }
677            } else {
678                RoomMessage::Plain {
679                    sender_fingerprint: our_fp.clone(),
680                    body: body.to_string(),
681                }
682            }
683        };
684
685        let bytes = encode_wire(&msg)?;
686        self.network
687            .publish_room_message(room_id.to_string(), bytes)
688            .await;
689
690        let now = now_unix();
691        let msg_id =
692            repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
693        repo::update_room_last_active(&self.db, room_id, now)?;
694
695        let _ = self.app_event_tx.send(AppEvent::MessageSent {
696            room_id: room_id.to_string(),
697            body: body.to_string(),
698            message_id: msg_id,
699        });
700
701        Ok(())
702    }
703
704    pub async fn shutdown(&self) {
705        self.network.shutdown().await;
706    }
707
708    // -------------------------------------------------------------------
709    // Dial / known peers
710    // -------------------------------------------------------------------
711
712    /// Dial a peer by a user-entered address. Accepts:
713    /// - `1.2.3.4:9000`
714    /// - `[fe80::1]:9000`
715    /// - `/ip4/.../tcp/...[/p2p/<peer>]` (raw multiaddr)
716    pub async fn dial(&self, input: &str) -> Result<()> {
717        let multiaddr = parse_dial_address(input)?;
718        let canonical = multiaddr.to_string();
719        info!(%canonical, "dialing");
720
721        repo::upsert_known_peer(
722            &self.db,
723            &KnownPeer {
724                address: canonical.clone(),
725                label: None,
726                last_connected_at: None,
727                last_attempt_at: Some(now_unix()),
728                created_at: now_unix(),
729                // Fingerprint isn't known until Identify lands after the
730                // dial completes; the connection-success handler upserts
731                // again with the fingerprint and trusted=true.
732                fingerprint: None,
733                trusted: false,
734            },
735        )?;
736
737        let _ = self.app_event_tx.send(AppEvent::Dialing {
738            address: canonical.clone(),
739        });
740        self.network.dial(multiaddr).await;
741        Ok(())
742    }
743
744    pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
745        let connected = self.connected_dial_addrs.lock().unwrap().clone();
746        let stored = repo::list_known_peers(&self.db).unwrap_or_default();
747        stored
748            .into_iter()
749            .map(|p| {
750                let connected_peer = connected.get(&p.address).copied();
751                KnownPeerStatus {
752                    address: p.address,
753                    label: p.label,
754                    last_connected_at: p.last_connected_at,
755                    connected_peer_id: connected_peer,
756                }
757            })
758            .collect()
759    }
760
761    pub async fn forget_peer(&self, address: &str) -> Result<()> {
762        repo::forget_known_peer(&self.db, address)?;
763        self.connected_dial_addrs.lock().unwrap().remove(address);
764        Ok(())
765    }
766
767    /// Re-dial a stored address — used by the lobby's "reconnect" action.
768    pub async fn redial(&self, address: &str) -> Result<()> {
769        self.dial(address).await
770    }
771
772    /// Phase A: user pressed Accept on the inbound-dial modal. Promotes
773    /// the peer to the gossipsub mesh. Does NOT mark them trusted —
774    /// that's `trust_inbound`, the explicit "remember and bypass next
775    /// time" path.
776    pub async fn accept_inbound(&self, peer_id: PeerId, address: &str) {
777        self.network.accept_inbound(peer_id).await;
778        self.connected_dial_addrs
779            .lock()
780            .unwrap()
781            .insert(address.to_string(), peer_id);
782    }
783
784    /// Phase A: user pressed Reject on the inbound-dial modal. Disconnects
785    /// the peer, adds them to the persistent blocklist, and ensures every
786    /// subsequent connection attempt from this fingerprint is auto-
787    /// dropped without re-prompting.
788    pub async fn reject_inbound(&self, peer_id: PeerId, fingerprint: &str) -> Result<()> {
789        self.network.reject_inbound(peer_id).await;
790        repo::block_peer(&self.db, fingerprint, now_unix())?;
791        Ok(())
792    }
793
794    /// Phase A: user pressed Trust+Accept — accept the connection AND
795    /// remember the peer so subsequent connections bypass the modal.
796    pub async fn trust_inbound(
797        &self,
798        peer_id: PeerId,
799        fingerprint: &str,
800        address: &str,
801    ) -> Result<()> {
802        self.network.accept_inbound(peer_id).await;
803        self.connected_dial_addrs
804            .lock()
805            .unwrap()
806            .insert(address.to_string(), peer_id);
807        // Persist the row with trusted=true so future inbound from
808        // this fingerprint short-circuits the modal in
809        // `process_network_event`'s InboundDial handler.
810        repo::upsert_known_peer(
811            &self.db,
812            &KnownPeer {
813                address: address.to_string(),
814                label: None,
815                last_connected_at: Some(now_unix()),
816                last_attempt_at: Some(now_unix()),
817                created_at: now_unix(),
818                fingerprint: Some(fingerprint.to_string()),
819                trusted: true,
820            },
821        )?;
822        Ok(())
823    }
824
825    fn spawn_known_peer_reconnector(&self) {
826        let handle = self.clone();
827        tokio::spawn(async move {
828            // Brief delay so our own listeners come up first.
829            tokio::time::sleep(Duration::from_millis(500)).await;
830            let known = repo::list_known_peers(&handle.db).unwrap_or_default();
831            // Reconnect each peer from its own task on a staggered, jittered
832            // delay so a long known-peer list doesn't fire a synchronized
833            // burst of dials (and serialized DB writes) all at once.
834            for (i, peer) in known.into_iter().enumerate() {
835                let handle = handle.clone();
836                tokio::spawn(async move {
837                    // Deterministic per-address jitter de-correlates peers
838                    // without pulling an RNG into scope.
839                    let jitter = (peer.address.len() as u64 * 37) % 200;
840                    tokio::time::sleep(Duration::from_millis(150 * i as u64 + jitter)).await;
841                    if let Err(e) = handle.dial(&peer.address).await {
842                        debug!(%e, addr = %peer.address, "auto-reconnect failed");
843                    }
844                });
845            }
846        });
847    }
848
849    // -------------------------------------------------------------------
850    // Internal helpers
851    // -------------------------------------------------------------------
852
853    fn load_or_create_identity(db: &Db) -> Result<Identity> {
854        if let Some(stored) = repo::load_identity(db)? {
855            let mut bytes = [0u8; 32];
856            bytes.copy_from_slice(&stored.ed25519_secret);
857            Identity::from_secret_bytes(bytes)
858        } else {
859            let id = Identity::generate()?;
860            repo::save_identity(db, &id.secret_bytes(), now_unix())?;
861            Ok(id)
862        }
863    }
864
865    fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
866        self.active_rooms
867            .lock()
868            .unwrap()
869            .get(room_id)
870            .and_then(|r| r.info.passphrase_salt.clone())
871            .or_else(|| {
872                // Try the cached announcement salt
873                ROOM_SALT_CACHE
874                    .lock()
875                    .unwrap()
876                    .get(room_id)
877                    .cloned()
878            })
879    }
880
881    async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
882        let owner_fingerprints =
883            repo::list_room_owners(&self.db, &info.id).unwrap_or_default();
884        let verified_only = repo::get_room_verified_only(&self.db, &info.id).unwrap_or(false);
885        let ann = RoomAnnouncement {
886            room_id: info.id.clone(),
887            name: info.name.clone(),
888            encrypted: info.encrypted,
889            passphrase_salt: info.passphrase_salt.clone(),
890            member_count,
891            creator_fingerprint: info.creator_fingerprint.clone(),
892            announced_at: now_unix(),
893            owner_fingerprints,
894            verified_only,
895        };
896        self.network.announce_room(ann).await;
897    }
898
899    async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
900        let our_fp = self.identity.fingerprint().to_string();
901        let wrapped = {
902            let mut rooms = self.active_rooms.lock().unwrap();
903            let room = rooms
904                .get_mut(room_id)
905                .ok_or_else(|| HuddleError::Other("not in room".into()))?;
906            if room.info.encrypted {
907                let crypto = room.crypto.as_mut().unwrap();
908                let session_key = crypto.our_session_key_b64();
909                let passphrase_key = room
910                    .passphrase_key
911                    .as_ref()
912                    .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
913                Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
914            } else {
915                None
916            }
917        };
918        let display_name = repo::get_display_name(&self.db).unwrap_or(None);
919        let msg = RoomMessage::MemberAnnounce {
920            sender_fingerprint: our_fp,
921            wrapped_session_key: wrapped,
922            display_name,
923            sender_ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
924        };
925        let bytes = encode_wire(&msg)?;
926        self.network
927            .publish_room_message(room_id.to_string(), bytes)
928            .await;
929        Ok(())
930    }
931
932    fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
933        let handle = self.clone();
934        tokio::spawn(async move {
935            while let Some(event) = net_rx.recv().await {
936                handle.process_network_event(event).await;
937            }
938            info!("event processor stopped");
939        });
940    }
941
942    fn spawn_announcement_ticker(&self) {
943        let handle = self.clone();
944        tokio::spawn(async move {
945            let mut interval =
946                tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
947            interval.tick().await; // skip the immediate tick
948            loop {
949                interval.tick().await;
950                let snapshot: Vec<(StoredRoom, u32)> = {
951                    let active = handle.active_rooms.lock().unwrap();
952                    active
953                        .values()
954                        .map(|r| (r.info.clone(), r.members.len() as u32))
955                        .collect()
956                };
957                for (info, member_count) in snapshot {
958                    handle.announce_room_now(&info, member_count).await;
959                }
960            }
961        });
962    }
963
964    fn spawn_discovered_room_pruner(&self) {
965        let handle = self.clone();
966        tokio::spawn(async move {
967            let mut interval = tokio::time::interval(Duration::from_secs(10));
968            interval.tick().await;
969            loop {
970                interval.tick().await;
971                let now = now_unix();
972                let mut to_drop = Vec::new();
973                {
974                    let mut map = handle.discovered_rooms.lock().unwrap();
975                    map.retain(|id, r| {
976                        if now - r.last_seen > DISCOVERED_TTL_SECS {
977                            to_drop.push(id.clone());
978                            false
979                        } else {
980                            true
981                        }
982                    });
983                }
984                for id in to_drop {
985                    let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
986                }
987            }
988        });
989    }
990
991    async fn process_network_event(&self, event: NetworkEvent) {
992        match event {
993            NetworkEvent::PeerDiscovered { peer_id } => {
994                let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
995            }
996            NetworkEvent::PeerExpired { peer_id } => {
997                // Drop any tracked dial-connection entry for this peer so
998                // the lobby's online/offline dots stay accurate. mDNS
999                // expiry only gives us a PeerId (no fingerprint), so we
1000                // can't touch room membership here — that relies on the
1001                // explicit MemberLeave path and the discovered-room TTL.
1002                self.connected_dial_addrs
1003                    .lock()
1004                    .unwrap()
1005                    .retain(|_addr, pid| *pid != peer_id);
1006                let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
1007            }
1008            NetworkEvent::ListeningOn { address } => {
1009                let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1010                    address: address.to_string(),
1011                });
1012            }
1013            NetworkEvent::RoomAnnouncementReceived(ann) => {
1014                // Cache the salt for join_room
1015                if let Some(salt) = &ann.passphrase_salt {
1016                    ROOM_SALT_CACHE
1017                        .lock()
1018                        .unwrap()
1019                        .insert(ann.room_id.clone(), salt.clone());
1020                }
1021                let discovered = DiscoveredRoom {
1022                    room_id: ann.room_id.clone(),
1023                    name: ann.name.clone(),
1024                    encrypted: ann.encrypted,
1025                    member_count: ann.member_count,
1026                    creator_fingerprint: ann.creator_fingerprint.clone(),
1027                    last_seen: now_unix(),
1028                    restorable: false,
1029                };
1030                // If we're already in this room, cache the announcement so
1031                // others can still discover it through us, but don't emit
1032                // RoomDiscovered — it isn't "newly discovered" to us, and
1033                // emitting it spuriously re-opens the lobby join prompt.
1034                if self.active_rooms.lock().unwrap().contains_key(&ann.room_id) {
1035                    self.discovered_rooms
1036                        .lock()
1037                        .unwrap()
1038                        .insert(ann.room_id.clone(), discovered);
1039                    return;
1040                }
1041                self.discovered_rooms
1042                    .lock()
1043                    .unwrap()
1044                    .insert(ann.room_id.clone(), discovered.clone());
1045                let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
1046            }
1047            NetworkEvent::RoomMessageReceived {
1048                room_id,
1049                payload,
1050                from_peer: _,
1051            } => {
1052                // v0.3.0+: every wire message is a `WireMessage` envelope.
1053                // `Plain` carries an unsigned `RoomMessage`; `Signed` is an
1054                // app-level Ed25519 envelope that we verify before
1055                // unwrapping. A failed verify is logged and dropped — we
1056                // never dispatch unverified-but-claiming-to-be-signed
1057                // messages.
1058                let wire: WireMessage = match serde_json::from_slice(&payload) {
1059                    Ok(w) => w,
1060                    Err(e) => {
1061                        warn!(%e, "bad wire envelope");
1062                        return;
1063                    }
1064                };
1065                let (msg, verified_signer) = match wire {
1066                    WireMessage::Plain(m) => (m, None),
1067                    WireMessage::Signed(env) => {
1068                        match crate::crypto::verify_signed(&env) {
1069                            Ok((m, fp)) => (m, Some(fp)),
1070                            Err(e) => {
1071                                warn!(%e, fp = %env.fingerprint, "signed envelope verify failed");
1072                                return;
1073                            }
1074                        }
1075                    }
1076                };
1077                self.handle_room_message(&room_id, msg, verified_signer).await;
1078            }
1079            NetworkEvent::DialSucceeded { peer_id, address } => {
1080                let addr_s = address.to_string();
1081                self.connected_dial_addrs
1082                    .lock()
1083                    .unwrap()
1084                    .insert(addr_s.clone(), peer_id);
1085                // Fingerprint isn't known yet (Identify hasn't landed);
1086                // the PeerIdentified handler below upserts again to add
1087                // the fingerprint and flip trusted=true once it does.
1088                let _ = repo::upsert_known_peer(
1089                    &self.db,
1090                    &KnownPeer {
1091                        address: addr_s.clone(),
1092                        label: None,
1093                        last_connected_at: Some(now_unix()),
1094                        last_attempt_at: Some(now_unix()),
1095                        created_at: now_unix(),
1096                        fingerprint: None,
1097                        trusted: false,
1098                    },
1099                );
1100                let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
1101                    address: addr_s,
1102                    peer_id,
1103                });
1104            }
1105            NetworkEvent::DialFailed { address, error } => {
1106                let addr_s = address.to_string();
1107                let _ = self.app_event_tx.send(AppEvent::DialFailed {
1108                    address: addr_s,
1109                    error,
1110                });
1111            }
1112            NetworkEvent::PeerIdentified { peer_id, fingerprint } => {
1113                // For any address we user-dialed for this peer, retroactively
1114                // backfill the fingerprint and flip trusted=true. The
1115                // upsert's COALESCE preserves fingerprint once set and
1116                // its trusted-is-sticky-once-true clause means we don't
1117                // accidentally demote a row that was already trusted.
1118                let matched_addrs: Vec<String> = {
1119                    let map = self.connected_dial_addrs.lock().unwrap();
1120                    map.iter()
1121                        .filter_map(|(addr, pid)| {
1122                            if *pid == peer_id {
1123                                Some(addr.clone())
1124                            } else {
1125                                None
1126                            }
1127                        })
1128                        .collect()
1129                };
1130                for addr in matched_addrs {
1131                    let _ = repo::upsert_known_peer(
1132                        &self.db,
1133                        &KnownPeer {
1134                            address: addr,
1135                            label: None,
1136                            last_connected_at: Some(now_unix()),
1137                            last_attempt_at: Some(now_unix()),
1138                            created_at: now_unix(),
1139                            fingerprint: Some(fingerprint.clone()),
1140                            trusted: true,
1141                        },
1142                    );
1143                }
1144            }
1145            NetworkEvent::RelayReservationEstablished { address } => {
1146                // Treat the circuit address like any other listen
1147                // address — the TUI's ListeningOn handler dedups + adds
1148                // it to the addresses pane. Also emit a status hint via
1149                // ListeningOn so the lobby's reachability line updates.
1150                info!(addr = %address, "relay reservation established");
1151                let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1152                    address: address.to_string(),
1153                });
1154            }
1155            NetworkEvent::InboundDial {
1156                peer_id,
1157                fingerprint,
1158                address,
1159            } => {
1160                // First: cheap server-side filters before bothering the user.
1161                if repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false) {
1162                    info!(%fingerprint, "inbound dial auto-rejected: peer is blocked");
1163                    self.network.reject_inbound(peer_id).await;
1164                    return;
1165                }
1166                // Phase E: global verified-only inbound mode. If on,
1167                // reject any unverified fingerprint without prompting.
1168                // SAS-verified (Phase G) and already-trusted (Phase A)
1169                // peers still come through.
1170                let global_verified_only =
1171                    repo::get_setting(&self.db, "verified_only_inbound")
1172                        .ok()
1173                        .flatten()
1174                        .map(|v| v == "1")
1175                        .unwrap_or(false);
1176                if global_verified_only {
1177                    let is_verified =
1178                        repo::is_globally_verified(&self.db, &fingerprint).unwrap_or(false)
1179                            || repo::is_fingerprint_trusted(&self.db, &fingerprint)
1180                                .unwrap_or(false);
1181                    if !is_verified {
1182                        info!(
1183                            %fingerprint,
1184                            "inbound dial auto-rejected: verified-only mode"
1185                        );
1186                        self.network.reject_inbound(peer_id).await;
1187                        return;
1188                    }
1189                }
1190                if repo::is_fingerprint_trusted(&self.db, &fingerprint).unwrap_or(false) {
1191                    info!(%fingerprint, "inbound dial auto-accepted: peer is trusted");
1192                    // Persist the address → peer_id mapping just as a
1193                    // user-dial would, so the lobby's online dot lights up.
1194                    self.connected_dial_addrs
1195                        .lock()
1196                        .unwrap()
1197                        .insert(address.to_string(), peer_id);
1198                    let _ = repo::upsert_known_peer(
1199                        &self.db,
1200                        &KnownPeer {
1201                            address: address.to_string(),
1202                            label: None,
1203                            last_connected_at: Some(now_unix()),
1204                            last_attempt_at: Some(now_unix()),
1205                            created_at: now_unix(),
1206                            fingerprint: Some(fingerprint),
1207                            trusted: true,
1208                        },
1209                    );
1210                    self.network.accept_inbound(peer_id).await;
1211                    return;
1212                }
1213                // Unknown peer — surface the modal in the TUI.
1214                let _ = self.app_event_tx.send(AppEvent::InboundDial {
1215                    peer_id,
1216                    fingerprint,
1217                    address: address.to_string(),
1218                });
1219            }
1220        }
1221    }
1222
1223    /// `verified_signer` is `Some(fp)` if this message arrived inside a
1224    /// successfully-verified `WireMessage::Signed` envelope — in which
1225    /// case the inner sender_fingerprint *must* match. `None` for
1226    /// `WireMessage::Plain`. Phase B's `OwnerGrant`/`BanMember` arms
1227    /// require it to be `Some` AND the signer to be a current owner.
1228    async fn handle_room_message(
1229        &self,
1230        room_id: &str,
1231        msg: RoomMessage,
1232        verified_signer: Option<String>,
1233    ) {
1234        let our_fp = self.identity.fingerprint().to_string();
1235        match msg {
1236            RoomMessage::MemberAnnounce {
1237                sender_fingerprint,
1238                wrapped_session_key,
1239                display_name,
1240                sender_ed25519_pubkey,
1241            } => {
1242                if sender_fingerprint == our_fp {
1243                    return;
1244                }
1245                // Drop announcements from banned fingerprints — they
1246                // can't rejoin until an owner unbans them (Phase B).
1247                if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
1248                    .unwrap_or(false)
1249                {
1250                    info!(%sender_fingerprint, %room_id, "dropping MemberAnnounce from banned peer");
1251                    return;
1252                }
1253                // Phase E per-room enforcement: if this room is
1254                // verified-only and the joiner isn't globally SAS-
1255                // verified, refuse to add them. The lowest-fp owner
1256                // (deterministic across honest peers) also sends a
1257                // signed `JoinRefused` so the joiner gets an explicit
1258                // message instead of a silent hang.
1259                if repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
1260                    && !repo::is_globally_verified(&self.db, &sender_fingerprint).unwrap_or(false)
1261                {
1262                    info!(
1263                        %sender_fingerprint, %room_id,
1264                        "dropping MemberAnnounce: room is verified-only and joiner isn't verified"
1265                    );
1266                    let owners = repo::list_room_owners(&self.db, room_id).unwrap_or_default();
1267                    let lowest_owner = owners.iter().min().cloned();
1268                    if lowest_owner.as_deref() == Some(&our_fp) {
1269                        let msg = RoomMessage::JoinRefused {
1270                            room_id: room_id.to_string(),
1271                            target_fingerprint: sender_fingerprint.clone(),
1272                            reason: "room requires SAS verification — ask an existing member to verify you".into(),
1273                        };
1274                        if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1275                            if let Ok(bytes) =
1276                                crate::network::protocol::encode_wire_signed(&env)
1277                            {
1278                                self.network
1279                                    .publish_room_message(room_id.to_string(), bytes)
1280                                    .await;
1281                            }
1282                        }
1283                    }
1284                    return;
1285                }
1286                let need_inbound = {
1287                    let mut rooms = self.active_rooms.lock().unwrap();
1288                    let room = match rooms.get_mut(room_id) {
1289                        Some(r) => r,
1290                        None => return,
1291                    };
1292                    let newly_added = room.members.insert(sender_fingerprint.clone());
1293                    if newly_added {
1294                        let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1295                            room_id: room_id.to_string(),
1296                            fingerprint: sender_fingerprint.clone(),
1297                        });
1298                    }
1299                    // Persist member with optional display name + pubkey.
1300                    // `ed25519_pubkey` is `None` for pre-0.3 peers; the
1301                    // upsert COALESCEs so once we learn it we never lose
1302                    // it on a later announce that drops the field.
1303                    let _ = repo::upsert_room_member(
1304                        &self.db,
1305                        &StoredRoomMember {
1306                            room_id: room_id.to_string(),
1307                            peer_id: String::new(), // unknown at this layer
1308                            fingerprint: sender_fingerprint.clone(),
1309                            last_seen: Some(now_unix()),
1310                            verified: false,
1311                            ed25519_pubkey: sender_ed25519_pubkey.clone(),
1312                            // Role is set on first insert only — the
1313                            // upsert ON CONFLICT clause preserves an
1314                            // existing 'owner' on re-announce. A genuine
1315                            // new fingerprint is a 'member' until an
1316                            // OwnerGrant lands.
1317                            role: "member".into(),
1318                        },
1319                    );
1320                    if let Some(name) = display_name.as_deref() {
1321                        let _ = repo::set_member_display_name(
1322                            &self.db,
1323                            room_id,
1324                            &sender_fingerprint,
1325                            Some(name),
1326                        );
1327                    }
1328                    room.info.encrypted && wrapped_session_key.is_some()
1329                };
1330
1331                if need_inbound {
1332                    let wrapped = wrapped_session_key.unwrap();
1333                    let result = {
1334                        let mut rooms = self.active_rooms.lock().unwrap();
1335                        let room = rooms.get_mut(room_id).unwrap();
1336                        let passphrase_key = match &room.passphrase_key {
1337                            Some(k) => k,
1338                            None => {
1339                                warn!("no passphrase key when receiving session key");
1340                                return;
1341                            }
1342                        };
1343                        match passphrase::unwrap(&wrapped, passphrase_key) {
1344                            Ok(plain) => match String::from_utf8(plain) {
1345                                Ok(key_b64) => {
1346                                    let crypto = room.crypto.as_mut().unwrap();
1347                                    crypto.add_inbound_session(&sender_fingerprint, &key_b64)
1348                                }
1349                                Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
1350                            },
1351                            Err(e) => Err(e),
1352                        }
1353                    };
1354                    if let Err(e) = result {
1355                        error!(%e, "add inbound session failed");
1356                    }
1357                }
1358            }
1359            RoomMessage::SessionKeyRequest {
1360                requester_fingerprint,
1361            } => {
1362                if requester_fingerprint == our_fp {
1363                    return;
1364                }
1365                // Re-announce ourselves to share our session key with the new joiner.
1366                if let Err(e) = self.broadcast_member_announce(room_id).await {
1367                    warn!(%e, "broadcast member announce on request");
1368                }
1369            }
1370            RoomMessage::Encrypted {
1371                sender_fingerprint,
1372                session_id,
1373                ciphertext_b64,
1374            } => {
1375                if sender_fingerprint == our_fp {
1376                    return;
1377                }
1378                let ct_bytes = match base64::Engine::decode(
1379                    &base64::engine::general_purpose::STANDARD,
1380                    &ciphertext_b64,
1381                ) {
1382                    Ok(b) => b,
1383                    Err(e) => {
1384                        warn!(%e, "bad base64 ciphertext");
1385                        return;
1386                    }
1387                };
1388                let plaintext = {
1389                    let mut rooms = self.active_rooms.lock().unwrap();
1390                    let room = match rooms.get_mut(room_id) {
1391                        Some(r) => r,
1392                        None => return,
1393                    };
1394                    let crypto = match room.crypto.as_mut() {
1395                        Some(c) => c,
1396                        None => return,
1397                    };
1398                    crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1399                };
1400                match plaintext {
1401                    Ok(pt) => {
1402                        let body = String::from_utf8_lossy(&pt).to_string();
1403                        let sent_at = now_unix();
1404                        let _ = repo::insert_room_message(
1405                            &self.db,
1406                            room_id,
1407                            &sender_fingerprint,
1408                            "in",
1409                            &body,
1410                            sent_at,
1411                        );
1412                        let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1413                        self.maybe_emit_mention(room_id, &body);
1414                        let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1415                            room_id: room_id.to_string(),
1416                            sender_fingerprint,
1417                            body,
1418                            sent_at,
1419                        });
1420                    }
1421                    Err(e) => {
1422                        debug!(%e, "decrypt failed (probably missing session key)");
1423                    }
1424                }
1425            }
1426            RoomMessage::Plain {
1427                sender_fingerprint,
1428                body,
1429            } => {
1430                if sender_fingerprint == our_fp {
1431                    return;
1432                }
1433                let sent_at = now_unix();
1434                let _ = repo::insert_room_message(
1435                    &self.db,
1436                    room_id,
1437                    &sender_fingerprint,
1438                    "in",
1439                    &body,
1440                    sent_at,
1441                );
1442                let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1443                self.maybe_emit_mention(room_id, &body);
1444                let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1445                    room_id: room_id.to_string(),
1446                    sender_fingerprint,
1447                    body,
1448                    sent_at,
1449                });
1450            }
1451            RoomMessage::Typing { sender_fingerprint } => {
1452                if sender_fingerprint == our_fp {
1453                    return;
1454                }
1455                let expiry = now_unix() + TYPING_TTL_SECS;
1456                let mut rooms = self.active_rooms.lock().unwrap();
1457                if let Some(room) = rooms.get_mut(room_id) {
1458                    room.typers.insert(sender_fingerprint, expiry);
1459                }
1460                drop(rooms);
1461                let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1462                    room_id: room_id.to_string(),
1463                });
1464            }
1465            RoomMessage::RotateRoomKey {
1466                rotator_fingerprint,
1467                new_salt,
1468            } => {
1469                if rotator_fingerprint == our_fp {
1470                    return;
1471                }
1472                let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1473                    room_id: room_id.to_string(),
1474                    rotator_fingerprint,
1475                    new_salt,
1476                });
1477            }
1478            RoomMessage::MemberLeave { sender_fingerprint } => {
1479                if sender_fingerprint == our_fp {
1480                    return;
1481                }
1482                let removed = {
1483                    let mut rooms = self.active_rooms.lock().unwrap();
1484                    if let Some(room) = rooms.get_mut(room_id) {
1485                        room.members.remove(&sender_fingerprint)
1486                    } else {
1487                        false
1488                    }
1489                };
1490                if removed {
1491                    let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1492                        room_id: room_id.to_string(),
1493                        fingerprint: sender_fingerprint,
1494                    });
1495                }
1496            }
1497            RoomMessage::FileOffer {
1498                sender_fingerprint,
1499                file_id,
1500                name,
1501                size_bytes,
1502                mime,
1503                chunk_count,
1504                encrypted_meta,
1505            } => {
1506                if sender_fingerprint == our_fp {
1507                    return; // ignore our own broadcast
1508                }
1509                self.handle_file_offer(
1510                    room_id,
1511                    sender_fingerprint,
1512                    file_id,
1513                    name,
1514                    size_bytes,
1515                    mime,
1516                    chunk_count,
1517                    encrypted_meta,
1518                );
1519            }
1520            RoomMessage::FileChunk {
1521                sender_fingerprint,
1522                file_id,
1523                chunk_index,
1524                total_chunks,
1525                data_b64,
1526            } => {
1527                if sender_fingerprint == our_fp {
1528                    return;
1529                }
1530                self.handle_file_chunk(
1531                    room_id,
1532                    sender_fingerprint,
1533                    file_id,
1534                    chunk_index,
1535                    total_chunks,
1536                    data_b64,
1537                );
1538            }
1539            RoomMessage::OwnerGrant {
1540                room_id: announced_room_id,
1541                target_fingerprint,
1542            } => {
1543                // Both: payload room_id must match the topic's room_id
1544                // (no cross-room replay), AND the signer must be a
1545                // current owner of this room. Unsigned forgeries land in
1546                // `verified_signer = None` and are dropped here.
1547                if announced_room_id != room_id {
1548                    warn!(payload_room = %announced_room_id, topic_room = %room_id, "OwnerGrant room mismatch");
1549                    return;
1550                }
1551                let signer = match verified_signer {
1552                    Some(fp) => fp,
1553                    None => {
1554                        warn!(%room_id, "OwnerGrant arrived unsigned; dropping");
1555                        return;
1556                    }
1557                };
1558                if !self.is_owner(room_id, &signer) {
1559                    warn!(%signer, %room_id, "OwnerGrant signer isn't an owner; dropping");
1560                    return;
1561                }
1562                info!(%signer, %target_fingerprint, %room_id, "OwnerGrant applied");
1563                if let Err(e) =
1564                    repo::set_member_role(&self.db, room_id, &target_fingerprint, "owner")
1565                {
1566                    warn!(%e, "OwnerGrant: set_member_role failed");
1567                }
1568            }
1569            RoomMessage::BanMember {
1570                room_id: announced_room_id,
1571                target_fingerprint,
1572            } => {
1573                if announced_room_id != room_id {
1574                    warn!(payload_room = %announced_room_id, topic_room = %room_id, "BanMember room mismatch");
1575                    return;
1576                }
1577                let signer = match verified_signer {
1578                    Some(fp) => fp,
1579                    None => {
1580                        warn!(%room_id, "BanMember arrived unsigned; dropping");
1581                        return;
1582                    }
1583                };
1584                if !self.is_owner(room_id, &signer) {
1585                    warn!(%signer, %room_id, "BanMember signer isn't an owner; dropping");
1586                    return;
1587                }
1588                if target_fingerprint == our_fp {
1589                    // We've been kicked. Locally evict ourselves so the
1590                    // TUI tabs close; the kicker's subsequent
1591                    // RotateRoomKey will arrive separately and we
1592                    // simply won't be able to decrypt the new key,
1593                    // matching the "soft kick" semantics.
1594                    info!(%room_id, %signer, "we were kicked from this room");
1595                    self.active_rooms.lock().unwrap().remove(room_id);
1596                    let _ = self.app_event_tx.send(AppEvent::RoomLeft {
1597                        room_id: room_id.to_string(),
1598                    });
1599                    return;
1600                }
1601                info!(%signer, %target_fingerprint, %room_id, "BanMember applied");
1602                if let Err(e) = repo::add_room_ban(
1603                    &self.db,
1604                    room_id,
1605                    &target_fingerprint,
1606                    &signer,
1607                    "", // signature lives in the envelope, not the row
1608                    now_unix(),
1609                ) {
1610                    warn!(%e, "BanMember: add_room_ban failed");
1611                }
1612                self.evict_banned_member(room_id, &target_fingerprint);
1613            }
1614            RoomMessage::SasInit {
1615                tx_id,
1616                ephemeral_x25519_pubkey_b64,
1617                target_fingerprint,
1618            } => {
1619                if target_fingerprint != our_fp {
1620                    // Not addressed to us — ignore. Phase G is point-
1621                    // to-point even though it travels over the room
1622                    // topic, so members of the room who aren't the
1623                    // target don't need to act.
1624                    return;
1625                }
1626                let signer = match verified_signer {
1627                    Some(fp) => fp,
1628                    None => {
1629                        warn!("SasInit arrived unsigned; dropping");
1630                        return;
1631                    }
1632                };
1633                let their_pub =
1634                    match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
1635                        Ok(pk) => pk,
1636                        Err(e) => {
1637                            warn!(%e, "SasInit: bad x25519 pubkey");
1638                            return;
1639                        }
1640                    };
1641                let tx_id_bytes = match B64.decode(&tx_id) {
1642                    Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
1643                        let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
1644                        arr.copy_from_slice(&b);
1645                        arr
1646                    }
1647                    _ => {
1648                        warn!(%tx_id, "SasInit: bad tx_id length");
1649                        return;
1650                    }
1651                };
1652                let (_, our_secret, our_pub) = crate::crypto::sas::new_session();
1653                let sas_code =
1654                    crate::crypto::sas::derive_sas_code(&our_secret, &their_pub, &tx_id_bytes);
1655                self.sas_flows.lock().unwrap().insert(
1656                    tx_id.clone(),
1657                    SasFlow {
1658                        room_id: room_id.to_string(),
1659                        partner_fingerprint: signer.clone(),
1660                        our_secret,
1661                        sas_code: Some(sas_code.clone()),
1662                        our_confirmed: false,
1663                        their_confirmed: false,
1664                    },
1665                );
1666                // Respond with our pubkey so the initiator can compute
1667                // the same code.
1668                let response = RoomMessage::SasResponse {
1669                    tx_id: tx_id.clone(),
1670                    ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
1671                };
1672                if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
1673                    if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
1674                        self.network
1675                            .publish_room_message(room_id.to_string(), bytes)
1676                            .await;
1677                    }
1678                }
1679                let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
1680                    room_id: room_id.to_string(),
1681                    partner_fingerprint: signer,
1682                    tx_id,
1683                    emoji_string: sas_code.emoji_string(),
1684                    emoji_labels: sas_code.emoji_labels(),
1685                    decimal: sas_code.decimal,
1686                });
1687            }
1688            RoomMessage::SasResponse {
1689                tx_id,
1690                ephemeral_x25519_pubkey_b64,
1691            } => {
1692                let signer = match verified_signer {
1693                    Some(fp) => fp,
1694                    None => {
1695                        warn!("SasResponse arrived unsigned; dropping");
1696                        return;
1697                    }
1698                };
1699                let their_pub =
1700                    match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
1701                        Ok(pk) => pk,
1702                        Err(e) => {
1703                            warn!(%e, "SasResponse: bad x25519 pubkey");
1704                            return;
1705                        }
1706                    };
1707                let tx_id_bytes = match B64.decode(&tx_id) {
1708                    Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
1709                        let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
1710                        arr.copy_from_slice(&b);
1711                        arr
1712                    }
1713                    _ => return,
1714                };
1715                let emit = {
1716                    let mut flows = self.sas_flows.lock().unwrap();
1717                    let flow = match flows.get_mut(&tx_id) {
1718                        Some(f) => f,
1719                        None => {
1720                            warn!(%tx_id, "SasResponse for unknown tx_id");
1721                            return;
1722                        }
1723                    };
1724                    if flow.partner_fingerprint != signer {
1725                        warn!(
1726                            expected = %flow.partner_fingerprint, got = %signer,
1727                            "SasResponse signer doesn't match flow's partner; dropping"
1728                        );
1729                        return;
1730                    }
1731                    let code = crate::crypto::sas::derive_sas_code(
1732                        &flow.our_secret,
1733                        &their_pub,
1734                        &tx_id_bytes,
1735                    );
1736                    flow.sas_code = Some(code.clone());
1737                    code
1738                };
1739                let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
1740                    room_id: room_id.to_string(),
1741                    partner_fingerprint: signer,
1742                    tx_id,
1743                    emoji_string: emit.emoji_string(),
1744                    emoji_labels: emit.emoji_labels(),
1745                    decimal: emit.decimal,
1746                });
1747            }
1748            RoomMessage::CodeJoinRequest {
1749                room_id: announced_room_id,
1750                joiner_x25519_pubkey_b64,
1751                code,
1752            } => {
1753                if announced_room_id != room_id {
1754                    return;
1755                }
1756                let joiner_fp = match verified_signer {
1757                    Some(fp) => fp,
1758                    None => {
1759                        warn!("CodeJoinRequest unsigned; dropping");
1760                        return;
1761                    }
1762                };
1763                // Only owners with an active code are interested in
1764                // responding. Other peers (incl. non-issuing owners)
1765                // simply ignore.
1766                let our_fp = self.identity.fingerprint().to_string();
1767                if !self.is_owner(room_id, &our_fp) {
1768                    return;
1769                }
1770                // Match + consume the code. Single use.
1771                let now = now_unix();
1772                let (code_ok, our_session_id, wrap_input) = {
1773                    let mut rooms = self.active_rooms.lock().unwrap();
1774                    let room = match rooms.get_mut(room_id) {
1775                        Some(r) => r,
1776                        None => return,
1777                    };
1778                    if room.passphrase_key.is_none() {
1779                        warn!("CodeJoinRequest: no passphrase key locally; can't respond");
1780                        return;
1781                    }
1782                    let original_len = room.issued_codes.len();
1783                    room.issued_codes.retain(|(c, exp)| !(c == &code && *exp > now));
1784                    let matched = room.issued_codes.len() < original_len;
1785                    if !matched {
1786                        info!(%joiner_fp, "CodeJoinRequest: code invalid or expired; ignoring");
1787                        return;
1788                    }
1789                    let crypto = room.crypto.as_ref().unwrap();
1790                    (
1791                        true,
1792                        crypto.our_session_id(),
1793                        crypto.our_session_key_b64(),
1794                    )
1795                };
1796                let _ = code_ok;
1797                // ECDH with the joiner's ephemeral pubkey.
1798                let their_pub = match crate::crypto::sas::parse_pubkey(&joiner_x25519_pubkey_b64) {
1799                    Ok(pk) => pk,
1800                    Err(e) => {
1801                        warn!(%e, "CodeJoinRequest: bad pubkey");
1802                        return;
1803                    }
1804                };
1805                use x25519_dalek::{PublicKey, StaticSecret};
1806                let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
1807                let our_pub = PublicKey::from(&our_secret);
1808                let shared = our_secret.diffie_hellman(&their_pub);
1809                // HKDF the shared secret into a 32-byte wrap key.
1810                let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
1811                let mut wrap_key = [0u8; passphrase::KEY_LEN];
1812                hk.expand(b"huddle-code-join-v1", &mut wrap_key)
1813                    .expect("32 bytes is within HKDF limits");
1814                // Wrap our session key under the ECDH-derived key,
1815                // reusing the existing AEAD primitives.
1816                let wrapped = match passphrase::wrap(wrap_input.as_bytes(), &wrap_key) {
1817                    Ok(w) => w,
1818                    Err(e) => {
1819                        warn!(%e, "CodeJoinRequest: wrap failed");
1820                        return;
1821                    }
1822                };
1823                let response = RoomMessage::CodeJoinResponse {
1824                    room_id: room_id.to_string(),
1825                    target_fingerprint: joiner_fp.clone(),
1826                    owner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
1827                    owner_session_id: our_session_id,
1828                    wrapped_session_key_b64: wrapped,
1829                    nonce_b64: String::new(), // nonce is embedded in `wrapped` per passphrase::wrap
1830                };
1831                if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
1832                    if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
1833                        self.network
1834                            .publish_room_message(room_id.to_string(), bytes)
1835                            .await;
1836                    }
1837                }
1838                info!(%joiner_fp, %room_id, "issued CodeJoinResponse");
1839            }
1840            RoomMessage::CodeJoinResponse {
1841                room_id: announced_room_id,
1842                target_fingerprint,
1843                owner_x25519_pubkey_b64,
1844                owner_session_id,
1845                wrapped_session_key_b64,
1846                nonce_b64: _,
1847            } => {
1848                if announced_room_id != room_id || target_fingerprint != our_fp {
1849                    return;
1850                }
1851                let owner_fp = match verified_signer {
1852                    Some(fp) => fp,
1853                    None => {
1854                        warn!("CodeJoinResponse unsigned; dropping");
1855                        return;
1856                    }
1857                };
1858                let our_secret = match self
1859                    .pending_code_secrets
1860                    .lock()
1861                    .unwrap()
1862                    .remove(room_id)
1863                {
1864                    Some(s) => s,
1865                    None => {
1866                        warn!(%room_id, "CodeJoinResponse with no pending code-join state");
1867                        return;
1868                    }
1869                };
1870                let owner_pub = match crate::crypto::sas::parse_pubkey(&owner_x25519_pubkey_b64) {
1871                    Ok(pk) => pk,
1872                    Err(e) => {
1873                        warn!(%e, "CodeJoinResponse: bad owner pubkey");
1874                        return;
1875                    }
1876                };
1877                let shared = our_secret.diffie_hellman(&owner_pub);
1878                let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
1879                let mut wrap_key = [0u8; passphrase::KEY_LEN];
1880                hk.expand(b"huddle-code-join-v1", &mut wrap_key)
1881                    .expect("32 bytes within HKDF limits");
1882                let session_key_bytes =
1883                    match passphrase::unwrap(&wrapped_session_key_b64, &wrap_key) {
1884                        Ok(b) => b,
1885                        Err(e) => {
1886                            warn!(%e, "CodeJoinResponse: unwrap failed");
1887                            return;
1888                        }
1889                    };
1890                let session_key_str = match String::from_utf8(session_key_bytes) {
1891                    Ok(s) => s,
1892                    Err(e) => {
1893                        warn!(%e, "CodeJoinResponse: session key wasn't valid utf8");
1894                        return;
1895                    }
1896                };
1897                // Install as an inbound session keyed by the owner's fp.
1898                let mut rooms = self.active_rooms.lock().unwrap();
1899                if let Some(room) = rooms.get_mut(room_id) {
1900                    if let Some(crypto) = room.crypto.as_mut() {
1901                        if let Err(e) =
1902                            crypto.add_inbound_session(&owner_fp, &session_key_str)
1903                        {
1904                            warn!(%e, "CodeJoinResponse: add_inbound_session failed");
1905                        } else {
1906                            info!(%room_id, %owner_fp, %owner_session_id, "code-join completed; can decrypt owner's messages");
1907                            room.members.insert(owner_fp.clone());
1908                            let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1909                                room_id: room_id.to_string(),
1910                                fingerprint: owner_fp,
1911                            });
1912                        }
1913                    }
1914                }
1915            }
1916            RoomMessage::JoinRefused {
1917                room_id: announced_room_id,
1918                target_fingerprint,
1919                reason,
1920            } => {
1921                if announced_room_id != room_id || target_fingerprint != our_fp {
1922                    return;
1923                }
1924                // Surface the refusal as an Error so the user sees why
1925                // their join didn't take. The Phase 3 modal-queue rule
1926                // means this won't clobber typing in another modal.
1927                let _ = self.app_event_tx.send(AppEvent::Error {
1928                    description: format!("join refused: {reason}"),
1929                });
1930            }
1931            RoomMessage::SasConfirm { tx_id, matched } => {
1932                let signer = match verified_signer {
1933                    Some(fp) => fp,
1934                    None => return,
1935                };
1936                let (room_id_done, partner_fp_done, both_done) = {
1937                    let mut flows = self.sas_flows.lock().unwrap();
1938                    let flow = match flows.get_mut(&tx_id) {
1939                        Some(f) => f,
1940                        None => return,
1941                    };
1942                    if flow.partner_fingerprint != signer {
1943                        return;
1944                    }
1945                    if !matched {
1946                        // Partner declined / mismatch — drop the flow.
1947                        let _ = flow;
1948                        flows.remove(&tx_id);
1949                        return;
1950                    }
1951                    flow.their_confirmed = true;
1952                    if flow.our_confirmed && flow.their_confirmed {
1953                        (
1954                            Some(flow.room_id.clone()),
1955                            Some(flow.partner_fingerprint.clone()),
1956                            true,
1957                        )
1958                    } else {
1959                        (None, None, false)
1960                    }
1961                };
1962                if both_done {
1963                    if let (Some(rid), Some(pfp)) = (room_id_done, partner_fp_done) {
1964                        if let Err(e) = self.finish_sas(&tx_id, &rid, &pfp).await {
1965                            warn!(%e, "finish_sas failed");
1966                        }
1967                    }
1968                }
1969            }
1970        }
1971    }
1972
1973    // -------------------------------------------------------------------
1974    // File transfer — public API
1975    // -------------------------------------------------------------------
1976
1977    /// Send a local file to a room. Reads the file, optionally encrypts
1978    /// it for encrypted rooms, chunks it, broadcasts a FileOffer then
1979    /// each FileChunk. Returns the file_id once all chunks are queued.
1980    pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
1981        let bytes = std::fs::read(path)?;
1982        let name = path
1983            .file_name()
1984            .map(|n| n.to_string_lossy().to_string())
1985            .unwrap_or_else(|| "untitled".into());
1986        let mime = crate::files::guess_mime(&name);
1987        let original_path = path.to_path_buf();
1988
1989        let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
1990            let mut rooms = self.active_rooms.lock().unwrap();
1991            let room = rooms
1992                .get_mut(room_id)
1993                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1994            if room.info.encrypted {
1995                let crypto = room
1996                    .crypto
1997                    .as_mut()
1998                    .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1999                let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
2000                (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
2001            } else {
2002                (false, None, None, bytes)
2003            }
2004        };
2005        let _ = &mut maybe_session_id; // silence unused warning when non-encrypted
2006
2007        let plan =
2008            self.file_manager
2009                .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
2010        let file_id = plan.file_id.clone();
2011        let total = plan.chunks.len() as u32;
2012        let our_fp = self.identity.fingerprint().to_string();
2013
2014        let attachment = StoredAttachment {
2015            id: 0,
2016            room_id: room_id.to_string(),
2017            message_id: None,
2018            sender_fingerprint: our_fp.clone(),
2019            file_id: file_id.clone(),
2020            name: name.clone(),
2021            mime: mime.clone(),
2022            size_bytes: plan.size_bytes as i64,
2023            status: AttachmentStatus::Ready,
2024            cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
2025            saved_path: Some(original_path.to_string_lossy().into()),
2026            error: None,
2027            encrypted: room_encrypted,
2028            wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
2029            nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
2030            megolm_session_id: encrypted_meta_opt
2031                .as_ref()
2032                .map(|m| m.megolm_session_id.clone()),
2033            content_hash: encrypted_meta_opt.as_ref().map(|m| m.content_hash.clone()),
2034            created_at: now_unix(),
2035        };
2036        repo::upsert_attachment(&self.db, &attachment)?;
2037        let _ = self.app_event_tx.send(AppEvent::FileOffered {
2038            room_id: room_id.to_string(),
2039            file_id: file_id.clone(),
2040            name: name.clone(),
2041            size_bytes: plan.size_bytes,
2042            sender_fingerprint: our_fp.clone(),
2043        });
2044
2045        // Publish the offer.
2046        let offer = RoomMessage::FileOffer {
2047            sender_fingerprint: our_fp.clone(),
2048            file_id: file_id.clone(),
2049            name,
2050            size_bytes: plan.size_bytes,
2051            mime,
2052            chunk_count: total,
2053            encrypted_meta: encrypted_meta_opt,
2054        };
2055        if let Ok(bytes) = encode_wire(&offer) {
2056            self.network
2057                .publish_room_message(room_id.to_string(), bytes)
2058                .await;
2059        }
2060
2061        // Stream chunks. Brief pacing so gossipsub doesn't see a thundering
2062        // herd from a single peer.
2063        let net = self.network.clone();
2064        let room = room_id.to_string();
2065        let our = our_fp.clone();
2066        let fid = file_id.clone();
2067        let chunks = plan.chunks.clone();
2068        tokio::spawn(async move {
2069            for (i, data) in chunks.iter().enumerate() {
2070                let msg = RoomMessage::FileChunk {
2071                    sender_fingerprint: our.clone(),
2072                    file_id: fid.clone(),
2073                    chunk_index: i as u32,
2074                    total_chunks: total,
2075                    data_b64: B64.encode(data),
2076                };
2077                if let Ok(bytes) = encode_wire(&msg) {
2078                    net.publish_room_message(room.clone(), bytes).await;
2079                }
2080                tokio::time::sleep(Duration::from_millis(40)).await;
2081            }
2082        });
2083
2084        Ok(file_id)
2085    }
2086
2087    /// Save a completed/ready attachment to the user's Downloads folder.
2088    /// Decrypts encrypted attachments on the way out.
2089    pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
2090        let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2091            .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2092        if !matches!(
2093            attachment.status,
2094            AttachmentStatus::Ready | AttachmentStatus::Saved
2095        ) {
2096            return Err(HuddleError::Other(format!(
2097                "attachment is not ready (status={})",
2098                attachment.status.as_str()
2099            )));
2100        }
2101        // Our own encrypted attachment: the file_manager cache holds the
2102        // ciphertext and we have no inbound Megolm session keyed by
2103        // ourselves, so it can't be decrypted back. But `saved_path` still
2104        // points at the original plaintext we sent — copy from there.
2105        let plaintext = if attachment.encrypted
2106            && attachment.sender_fingerprint == self.identity.fingerprint()
2107        {
2108            match attachment
2109                .saved_path
2110                .as_deref()
2111                .filter(|p| Path::new(p).exists())
2112            {
2113                Some(src) => std::fs::read(src)?,
2114                None => {
2115                    return Err(HuddleError::Other(
2116                        "your original file has moved or been deleted — it can't be \
2117                         recovered from the encrypted cache"
2118                            .into(),
2119                    ));
2120                }
2121            }
2122        } else {
2123            let cached = self.file_manager.read_cache(file_id)?;
2124            if attachment.encrypted {
2125                let meta = EncryptedFileMeta {
2126                    megolm_session_id: attachment
2127                        .megolm_session_id
2128                        .clone()
2129                        .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
2130                    wrapped_key_b64: attachment
2131                        .wrapped_key
2132                        .clone()
2133                        .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
2134                    nonce_b64: attachment
2135                        .nonce
2136                        .clone()
2137                        .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
2138                    content_hash: attachment
2139                        .content_hash
2140                        .clone()
2141                        .ok_or_else(|| HuddleError::Other("missing content_hash".into()))?,
2142                };
2143                self.decrypt_attachment(
2144                    room_id,
2145                    &attachment.sender_fingerprint,
2146                    &cached,
2147                    &meta,
2148                )?
2149            } else {
2150                cached
2151            }
2152        };
2153        let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
2154        repo::update_attachment_paths(
2155            &self.db,
2156            room_id,
2157            file_id,
2158            None,
2159            Some(&saved.to_string_lossy()),
2160        )?;
2161        repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
2162        let _ = self.app_event_tx.send(AppEvent::FileSaved {
2163            file_id: file_id.into(),
2164            path: saved.to_string_lossy().into(),
2165        });
2166        Ok(saved)
2167    }
2168
2169    /// Drop any in-flight chunks and remove the attachment row.
2170    pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
2171        self.file_manager.cancel_incoming(file_id);
2172        repo::update_attachment_status(
2173            &self.db,
2174            room_id,
2175            file_id,
2176            AttachmentStatus::Cancelled,
2177            None,
2178        )?;
2179        Ok(())
2180    }
2181
2182    /// Launch the system's default opener on a saved file.
2183    pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
2184        let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2185            .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2186        let path = attachment
2187            .saved_path
2188            .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
2189        open_with_system(&path)
2190    }
2191
2192    pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
2193        repo::list_room_attachments(&self.db, room_id)
2194    }
2195
2196    /// Mark a peer's fingerprint as verified in the given room. Used by
2197    /// the `^V` verification modal after the user has compared the
2198    /// fingerprint out-of-band.
2199    pub fn set_member_verified(
2200        &self,
2201        room_id: &str,
2202        fingerprint: &str,
2203        verified: bool,
2204    ) -> Result<()> {
2205        // Make sure there's a member row to flip — peer_id is unknown
2206        // at this layer when the user verifies an out-of-band identity,
2207        // so we use the fingerprint as the canonical identity key with
2208        // an empty peer_id placeholder if none exists.
2209        let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
2210        if !members.iter().any(|m| m.fingerprint == fingerprint) {
2211            repo::upsert_room_member(
2212                &self.db,
2213                &StoredRoomMember {
2214                    room_id: room_id.to_string(),
2215                    peer_id: String::new(),
2216                    fingerprint: fingerprint.to_string(),
2217                    last_seen: Some(now_unix()),
2218                    verified,
2219                    ed25519_pubkey: None,
2220                    role: "member".into(),
2221                },
2222            )?;
2223        }
2224        repo::set_member_verified(&self.db, room_id, fingerprint, verified)
2225    }
2226
2227    pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
2228        repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
2229    }
2230
2231    /// Phase B: is `fingerprint` an owner of `room_id`? Used by the TUI
2232    /// to gate `^K` / `^G` and the kick/grant member-picker actions.
2233    pub fn is_owner(&self, room_id: &str, fingerprint: &str) -> bool {
2234        repo::list_room_owners(&self.db, room_id)
2235            .unwrap_or_default()
2236            .iter()
2237            .any(|fp| fp == fingerprint)
2238    }
2239
2240    pub fn we_are_owner(&self, room_id: &str) -> bool {
2241        self.is_owner(room_id, &self.identity.fingerprint().to_string())
2242    }
2243
2244    /// Phase B: list current owner fingerprints for `room_id` — used to
2245    /// render an owner badge in the member panel.
2246    pub fn room_owners(&self, room_id: &str) -> Vec<String> {
2247        repo::list_room_owners(&self.db, room_id).unwrap_or_default()
2248    }
2249
2250    /// Phase E: global toggle — when true, inbound dials from
2251    /// unverified fingerprints are auto-rejected without prompting.
2252    pub fn verified_only_inbound(&self) -> bool {
2253        repo::get_setting(&self.db, "verified_only_inbound")
2254            .unwrap_or(None)
2255            .map(|v| v == "1")
2256            .unwrap_or(false)
2257    }
2258
2259    pub fn set_verified_only_inbound(&self, on: bool) -> Result<()> {
2260        repo::set_setting(&self.db, "verified_only_inbound", if on { "1" } else { "0" })
2261    }
2262
2263    /// Phase E: per-room verified-only-join. When true, the host (and
2264    /// every honest existing member) drops MemberAnnounce from joiners
2265    /// who aren't globally SAS-verified, and the lowest-fp owner sends
2266    /// back a signed `JoinRefused` so the joiner sees an explanation.
2267    pub fn room_verified_only(&self, room_id: &str) -> bool {
2268        repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
2269    }
2270
2271    pub fn set_room_verified_only(&self, room_id: &str, on: bool) -> Result<()> {
2272        repo::set_room_verified_only(&self.db, room_id, on)
2273    }
2274
2275    /// Phase H: first-launch onboarding flag.
2276    pub fn onboarding_seen(&self) -> bool {
2277        repo::is_onboarding_seen(&self.db).unwrap_or(true)
2278    }
2279
2280    pub fn mark_onboarding_seen(&self) -> Result<()> {
2281        repo::mark_onboarding_seen(&self.db)
2282    }
2283
2284    /// Phase B: promote `target_fingerprint` to owner. Builds a signed
2285    /// `OwnerGrant`, broadcasts it, and applies it locally. Returns an
2286    /// error if we ourselves aren't an owner — only owners can grant.
2287    pub async fn grant_owner(&self, room_id: &str, target_fingerprint: &str) -> Result<()> {
2288        let our_fp = self.identity.fingerprint().to_string();
2289        if !self.is_owner(room_id, &our_fp) {
2290            return Err(HuddleError::Other(
2291                "only an owner can grant owner".into(),
2292            ));
2293        }
2294        let msg = RoomMessage::OwnerGrant {
2295            room_id: room_id.to_string(),
2296            target_fingerprint: target_fingerprint.to_string(),
2297        };
2298        let env = crate::crypto::sign_message(&self.identity, &msg)?;
2299        let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2300        self.network
2301            .publish_room_message(room_id.to_string(), bytes)
2302            .await;
2303        // Apply locally too — peers will converge on the next announce.
2304        repo::set_member_role(&self.db, room_id, target_fingerprint, "owner")?;
2305        Ok(())
2306    }
2307
2308    /// Phase B: kick `target_fingerprint` from `room_id`. Broadcasts a
2309    /// signed `BanMember`, records the ban locally, then immediately
2310    /// rotates the room key under a freshly-generated passphrase. Returns
2311    /// the new passphrase so the caller can show it to the owner for
2312    /// out-of-band sharing with remaining members.
2313    ///
2314    /// The rotation is the cryptographic enforcement: a banned peer can
2315    /// still subscribe to the gossipsub topic and see the ciphertext,
2316    /// but they can't unwrap the new session key without the new
2317    /// passphrase, so they can't decrypt anything sent after the kick.
2318    pub async fn kick_member(
2319        &self,
2320        room_id: &str,
2321        target_fingerprint: &str,
2322    ) -> Result<String> {
2323        let our_fp = self.identity.fingerprint().to_string();
2324        if !self.is_owner(room_id, &our_fp) {
2325            return Err(HuddleError::Other("only an owner can kick".into()));
2326        }
2327        if target_fingerprint == our_fp {
2328            return Err(HuddleError::Other("can't kick yourself".into()));
2329        }
2330        let info = self
2331            .active_rooms
2332            .lock()
2333            .unwrap()
2334            .get(room_id)
2335            .map(|r| r.info.clone())
2336            .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2337        if !info.encrypted {
2338            // Without a key to rotate, a "kick" is purely advisory —
2339            // ban only. Honest clients drop their messages, but anyone
2340            // can still read the room. Honest in v1; documented.
2341            let msg = RoomMessage::BanMember {
2342                room_id: room_id.to_string(),
2343                target_fingerprint: target_fingerprint.to_string(),
2344            };
2345            let env = crate::crypto::sign_message(&self.identity, &msg)?;
2346            let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2347            self.network
2348                .publish_room_message(room_id.to_string(), bytes)
2349                .await;
2350            repo::add_room_ban(
2351                &self.db,
2352                room_id,
2353                target_fingerprint,
2354                &our_fp,
2355                &env.signature_b64,
2356                now_unix(),
2357            )?;
2358            self.evict_banned_member(room_id, target_fingerprint);
2359            return Ok(String::new());
2360        }
2361        // Encrypted room — full kick path.
2362        let new_passphrase = generate_join_passphrase();
2363        let msg = RoomMessage::BanMember {
2364            room_id: room_id.to_string(),
2365            target_fingerprint: target_fingerprint.to_string(),
2366        };
2367        let env = crate::crypto::sign_message(&self.identity, &msg)?;
2368        let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2369        self.network
2370            .publish_room_message(room_id.to_string(), bytes)
2371            .await;
2372        repo::add_room_ban(
2373            &self.db,
2374            room_id,
2375            target_fingerprint,
2376            &our_fp,
2377            &env.signature_b64,
2378            now_unix(),
2379        )?;
2380        self.evict_banned_member(room_id, target_fingerprint);
2381        // Reuse the existing rotation flow so all the existing salt /
2382        // session / persistence logic stays in one place.
2383        self.rotate_room(room_id, &new_passphrase).await?;
2384        Ok(new_passphrase)
2385    }
2386
2387    /// Phase F: generate an 8-char alphanumeric join code for `room_id`,
2388    /// good for 10 minutes. Stored in memory only on the issuing owner's
2389    /// machine — a single use clears it. Caller is responsible for
2390    /// sharing the code OOB with the prospective joiner.
2391    ///
2392    /// Owner-only. Errors if `room_id` isn't active or we're not an owner.
2393    pub fn generate_join_code(&self, room_id: &str) -> Result<String> {
2394        let our_fp = self.identity.fingerprint().to_string();
2395        if !self.is_owner(room_id, &our_fp) {
2396            return Err(HuddleError::Other(
2397                "only an owner can issue join codes".into(),
2398            ));
2399        }
2400        let code = generate_alphanumeric_code(8);
2401        let expires_at = now_unix() + 10 * 60;
2402        let mut rooms = self.active_rooms.lock().unwrap();
2403        let room = rooms
2404            .get_mut(room_id)
2405            .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2406        // Prune expired entries while we're here so the list doesn't grow.
2407        let now = now_unix();
2408        room.issued_codes.retain(|(_, exp)| *exp > now);
2409        room.issued_codes.push((code.clone(), expires_at));
2410        Ok(code)
2411    }
2412
2413    /// Phase F: join `room_id` using a short-lived code instead of the
2414    /// passphrase. Generates an ephemeral X25519 keypair, broadcasts a
2415    /// signed `CodeJoinRequest`, and waits for the owner's
2416    /// `CodeJoinResponse`. The receive arm builds an `ActiveRoom`
2417    /// flagged read-only (no passphrase = can't share our outbound
2418    /// session key with others).
2419    pub async fn join_room_with_code(
2420        &self,
2421        room_id: &str,
2422        code: &str,
2423    ) -> Result<()> {
2424        // Resolve discovered metadata so we know name/encrypted/etc.
2425        let info = {
2426            let d = self.discovered_rooms.lock().unwrap().get(room_id).cloned();
2427            match d {
2428                Some(d) => StoredRoom {
2429                    id: room_id.to_string(),
2430                    name: d.name,
2431                    creator_fingerprint: d.creator_fingerprint,
2432                    encrypted: d.encrypted,
2433                    passphrase_salt: None, // unused on code-join path
2434                    created_at: now_unix(),
2435                    last_active: Some(now_unix()),
2436                },
2437                None => {
2438                    return Err(HuddleError::Other(format!(
2439                        "room {room_id} not visible — wait for an announcement"
2440                    )))
2441                }
2442            }
2443        };
2444        if !info.encrypted {
2445            return Err(HuddleError::Other(
2446                "code-join only applies to encrypted rooms".into(),
2447            ));
2448        }
2449        let our_fp = self.identity.fingerprint().to_string();
2450        // Generate ephemeral X25519 keypair; remember the secret so the
2451        // CodeJoinResponse receive arm can complete ECDH on this peer.
2452        use x25519_dalek::{PublicKey, StaticSecret};
2453        let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2454        let our_pub = PublicKey::from(&our_secret);
2455        // Stash the secret keyed by room_id; the response handler
2456        // matches on target_fingerprint=our_fp + room_id.
2457        self.pending_code_secrets
2458            .lock()
2459            .unwrap()
2460            .insert(room_id.to_string(), our_secret);
2461        // Create a placeholder ActiveRoom with no crypto yet; we'll
2462        // fill in the inbound session in the response handler.
2463        self.active_rooms.lock().unwrap().insert(
2464            room_id.to_string(),
2465            ActiveRoom {
2466                info: info.clone(),
2467                crypto: Some(RoomCrypto::new_for_room(
2468                    self.db.clone(),
2469                    room_id.to_string(),
2470                    our_fp.clone(),
2471                    self.session_persist_key,
2472                )?),
2473                passphrase_key: None,
2474                members: {
2475                    let mut s = HashSet::new();
2476                    s.insert(our_fp.clone());
2477                    s
2478                },
2479                typers: HashMap::new(),
2480                read_only: true,
2481                issued_codes: Vec::new(),
2482            },
2483        );
2484        self.network.subscribe_room(room_id.to_string()).await;
2485        // Broadcast the request.
2486        let req = RoomMessage::CodeJoinRequest {
2487            room_id: room_id.to_string(),
2488            joiner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2489            code: code.to_string(),
2490        };
2491        let env = crate::crypto::sign_message(&self.identity, &req)?;
2492        let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2493        self.network
2494            .publish_room_message(room_id.to_string(), bytes)
2495            .await;
2496        // Emit RoomJoined so the TUI opens the tab. Subsequent ability
2497        // to read messages depends on receiving the owner's response.
2498        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
2499            room_id: room_id.to_string(),
2500        });
2501        Ok(())
2502    }
2503
2504    /// Phase G: start an SAS verification with `target_fingerprint` in
2505    /// `room_id`. Returns the tx_id so the caller can correlate
2506    /// subsequent events. The full flow is asynchronous — the partner
2507    /// must accept on their end, both compute the ECDH-derived SAS
2508    /// code, OOB-compare it, and each press Match.
2509    pub async fn sas_start(&self, room_id: &str, target_fingerprint: &str) -> Result<String> {
2510        let (tx_id_bytes, our_secret, our_pub) = crate::crypto::sas::new_session();
2511        let tx_id = B64.encode(tx_id_bytes);
2512        let msg = RoomMessage::SasInit {
2513            tx_id: tx_id.clone(),
2514            ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2515            target_fingerprint: target_fingerprint.to_string(),
2516        };
2517        let env = crate::crypto::sign_message(&self.identity, &msg)?;
2518        let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2519        self.sas_flows.lock().unwrap().insert(
2520            tx_id.clone(),
2521            SasFlow {
2522                room_id: room_id.to_string(),
2523                partner_fingerprint: target_fingerprint.to_string(),
2524                our_secret,
2525                sas_code: None,
2526                our_confirmed: false,
2527                their_confirmed: false,
2528            },
2529        );
2530        self.network
2531            .publish_room_message(room_id.to_string(), bytes)
2532            .await;
2533        Ok(tx_id)
2534    }
2535
2536    /// Phase G: user pressed Match on the SAS code modal — broadcast our
2537    /// signed `SasConfirm{matched: true}`. If the partner has already
2538    /// matched, this completes verification on both sides.
2539    pub async fn sas_match(&self, tx_id: &str) -> Result<()> {
2540        let (room_id, partner_fp, both_done) = {
2541            let mut flows = self.sas_flows.lock().unwrap();
2542            let flow = flows
2543                .get_mut(tx_id)
2544                .ok_or_else(|| HuddleError::Other("unknown SAS tx_id".into()))?;
2545            flow.our_confirmed = true;
2546            (
2547                flow.room_id.clone(),
2548                flow.partner_fingerprint.clone(),
2549                flow.our_confirmed && flow.their_confirmed,
2550            )
2551        };
2552        let msg = RoomMessage::SasConfirm {
2553            tx_id: tx_id.to_string(),
2554            matched: true,
2555        };
2556        let env = crate::crypto::sign_message(&self.identity, &msg)?;
2557        let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2558        self.network
2559            .publish_room_message(room_id.clone(), bytes)
2560            .await;
2561        if both_done {
2562            self.finish_sas(tx_id, &room_id, &partner_fp).await?;
2563        }
2564        Ok(())
2565    }
2566
2567    /// Phase G: cancel an in-flight SAS — drop our local state. Doesn't
2568    /// broadcast a "matched=false" notice in v1 (partner's flow stays
2569    /// dangling; they can cancel their side too). Quiet teardown.
2570    pub fn sas_cancel(&self, tx_id: &str) {
2571        self.sas_flows.lock().unwrap().remove(tx_id);
2572    }
2573
2574    /// Phase G internal: both sides have confirmed — flip the partner's
2575    /// fingerprint to verified (per-room AND global) and clean up.
2576    async fn finish_sas(
2577        &self,
2578        tx_id: &str,
2579        room_id: &str,
2580        partner_fingerprint: &str,
2581    ) -> Result<()> {
2582        repo::set_member_verified(&self.db, room_id, partner_fingerprint, true)?;
2583        repo::add_verified_peer(&self.db, partner_fingerprint, now_unix())?;
2584        self.sas_flows.lock().unwrap().remove(tx_id);
2585        let _ = self.app_event_tx.send(AppEvent::SasVerified {
2586            room_id: room_id.to_string(),
2587            partner_fingerprint: partner_fingerprint.to_string(),
2588        });
2589        Ok(())
2590    }
2591
2592    /// Phase B internal: drop a banned member's in-memory presence in a
2593    /// room. Persistent ban already went to `room_bans`. Called from
2594    /// `kick_member` (locally banning ourselves) and from the
2595    /// `RoomMessage::BanMember` receive arm (peer-initiated ban).
2596    fn evict_banned_member(&self, room_id: &str, fingerprint: &str) {
2597        if let Some(room) = self.active_rooms.lock().unwrap().get_mut(room_id) {
2598            room.members.remove(fingerprint);
2599        }
2600        let _ = self.app_event_tx.send(AppEvent::MemberLeft {
2601            room_id: room_id.to_string(),
2602            fingerprint: fingerprint.to_string(),
2603        });
2604    }
2605
2606    pub fn display_name(&self) -> Option<String> {
2607        repo::get_display_name(&self.db).unwrap_or(None)
2608    }
2609
2610    pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
2611        repo::set_display_name(&self.db, name)
2612    }
2613
2614    /// Look up the display name we've seen for a peer in any room.
2615    pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
2616        repo::lookup_display_name(&self.db, fingerprint).unwrap_or(None)
2617    }
2618
2619    pub fn is_room_muted(&self, room_id: &str) -> bool {
2620        repo::is_room_muted(&self.db, room_id).unwrap_or(false)
2621    }
2622
2623    pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
2624        repo::set_room_muted(&self.db, room_id, muted)
2625    }
2626
2627    /// Broadcast a "I'm typing" pulse to the given room. Caller is
2628    /// responsible for debouncing (don't fire more than every ~500ms).
2629    pub async fn broadcast_typing(&self, room_id: &str) {
2630        if !self.active_rooms.lock().unwrap().contains_key(room_id) {
2631            return;
2632        }
2633        let msg = RoomMessage::Typing {
2634            sender_fingerprint: self.identity.fingerprint().to_string(),
2635        };
2636        if let Ok(bytes) = encode_wire(&msg) {
2637            self.network
2638                .publish_room_message(room_id.to_string(), bytes)
2639                .await;
2640        }
2641    }
2642
2643    /// Returns the fingerprints of peers currently typing in `room_id`,
2644    /// pruning entries past their TTL.
2645    pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
2646        let now = now_unix();
2647        let mut rooms = self.active_rooms.lock().unwrap();
2648        let room = match rooms.get_mut(room_id) {
2649            Some(r) => r,
2650            None => return Vec::new(),
2651        };
2652        room.typers.retain(|_, exp| *exp > now);
2653        let mut v: Vec<String> = room.typers.keys().cloned().collect();
2654        v.sort();
2655        v
2656    }
2657
2658    // -------------------------------------------------------------------
2659    // Room key rotation
2660    // -------------------------------------------------------------------
2661
2662    /// Rotate this room's outbound Megolm session under a fresh
2663    /// passphrase. Broadcasts `RotateRoomKey` (so other members know to
2664    /// expect a new passphrase) and a fresh `MemberAnnounce` with the
2665    /// new wrapped session key. Old inbound sessions stay in storage
2666    /// for decrypting historic messages.
2667    pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
2668        if new_passphrase.is_empty() {
2669            return Err(HuddleError::Other("new passphrase is empty".into()));
2670        }
2671        let new_salt = passphrase::random_salt();
2672        let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
2673
2674        let info = {
2675            let mut rooms = self.active_rooms.lock().unwrap();
2676            let room = rooms
2677                .get_mut(room_id)
2678                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2679            if !room.info.encrypted {
2680                return Err(HuddleError::Other(
2681                    "rotation only applies to encrypted rooms".into(),
2682                ));
2683            }
2684            // Generate a fresh outbound Megolm session for this member.
2685            let new_crypto = RoomCrypto::new_for_room(
2686                self.db.clone(),
2687                room_id.to_string(),
2688                self.identity.fingerprint().to_string(),
2689                self.session_persist_key,
2690            )?;
2691            room.crypto = Some(new_crypto);
2692            room.passphrase_key = Some(new_key);
2693            room.info.passphrase_salt = Some(new_salt.to_vec());
2694            room.info.clone()
2695        };
2696
2697        // Broadcast before persisting: peers learn about the rotation even
2698        // if we crash before the DB write lands, and our own restore path
2699        // can recover from the persisted Megolm session plus the announced
2700        // salt. Persisting first would risk a DB row that's ahead of what
2701        // any peer knows.
2702        let rot = RoomMessage::RotateRoomKey {
2703            rotator_fingerprint: self.identity.fingerprint().to_string(),
2704            new_salt: new_salt.to_vec(),
2705        };
2706        if let Ok(bytes) = encode_wire(&rot) {
2707            self.network
2708                .publish_room_message(room_id.to_string(), bytes)
2709                .await;
2710        }
2711        // Re-announce ourselves with the new wrapped session key.
2712        if let Err(e) = self.broadcast_member_announce(room_id).await {
2713            warn!(%e, "rotate: broadcast announce failed");
2714        }
2715
2716        // Now persist the new salt on the stored row.
2717        repo::insert_room(&self.db, &info)?;
2718        Ok(())
2719    }
2720
2721    /// Used by the TUI when another member rotates a room we're in.
2722    /// Derives the new key, updates our local state, and re-announces
2723    /// so the rotator can share their fresh outbound session with us.
2724    pub async fn accept_rotation(
2725        &self,
2726        room_id: &str,
2727        new_salt: &[u8],
2728        new_passphrase: &str,
2729    ) -> Result<()> {
2730        let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
2731        let info = {
2732            let mut rooms = self.active_rooms.lock().unwrap();
2733            let room = rooms
2734                .get_mut(room_id)
2735                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2736            room.passphrase_key = Some(new_key);
2737            room.info.passphrase_salt = Some(new_salt.to_vec());
2738            room.info.clone()
2739        };
2740        // Ask the rotator (and anyone) to re-share their session key
2741        // before persisting, so a crash before the DB write still leaves
2742        // peers aware we've moved to the new salt.
2743        let req = RoomMessage::SessionKeyRequest {
2744            requester_fingerprint: self.identity.fingerprint().to_string(),
2745        };
2746        if let Ok(bytes) = encode_wire(&req) {
2747            self.network
2748                .publish_room_message(room_id.to_string(), bytes)
2749                .await;
2750        }
2751        repo::insert_room(&self.db, &info)?;
2752        Ok(())
2753    }
2754
2755    // -------------------------------------------------------------------
2756    // File transfer — internal handlers
2757    // -------------------------------------------------------------------
2758
2759    #[allow(clippy::too_many_arguments)]
2760    fn handle_file_offer(
2761        &self,
2762        room_id: &str,
2763        sender_fingerprint: String,
2764        file_id: String,
2765        name: String,
2766        size_bytes: u64,
2767        mime: Option<String>,
2768        _chunk_count: u32,
2769        encrypted_meta: Option<EncryptedFileMeta>,
2770    ) {
2771        let encrypted = encrypted_meta.is_some();
2772        let attachment = StoredAttachment {
2773            id: 0,
2774            room_id: room_id.to_string(),
2775            message_id: None,
2776            sender_fingerprint: sender_fingerprint.clone(),
2777            file_id: file_id.clone(),
2778            name: name.clone(),
2779            mime,
2780            size_bytes: size_bytes as i64,
2781            status: AttachmentStatus::Offered,
2782            cache_path: None,
2783            saved_path: None,
2784            error: None,
2785            encrypted,
2786            wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
2787            nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
2788            megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
2789            content_hash: encrypted_meta.as_ref().map(|m| m.content_hash.clone()),
2790            created_at: now_unix(),
2791        };
2792        if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
2793            warn!(%e, "upsert attachment");
2794            return;
2795        }
2796        // If chunks started arriving before this offer, the transfer's
2797        // size denominator was a guess — correct it with the real size.
2798        self.file_manager.set_expected_size(&file_id, size_bytes);
2799        let _ = self.app_event_tx.send(AppEvent::FileOffered {
2800            room_id: room_id.to_string(),
2801            file_id,
2802            name,
2803            size_bytes,
2804            sender_fingerprint,
2805        });
2806    }
2807
2808    fn handle_file_chunk(
2809        &self,
2810        room_id: &str,
2811        _sender_fingerprint: String,
2812        file_id: String,
2813        chunk_index: u32,
2814        total_chunks: u32,
2815        data_b64: String,
2816    ) {
2817        let data = match B64.decode(&data_b64) {
2818            Ok(d) => d,
2819            Err(e) => {
2820                warn!(%e, "bad chunk base64");
2821                return;
2822            }
2823        };
2824        // Pull the announced size + lifecycle state from our stored offer.
2825        // A terminal-state row means the user cancelled or the transfer
2826        // already failed — late chunks must not resurrect it.
2827        let expected_size = match repo::get_attachment(&self.db, room_id, &file_id) {
2828            Ok(Some(a)) => {
2829                if matches!(
2830                    a.status,
2831                    AttachmentStatus::Cancelled | AttachmentStatus::Failed
2832                ) {
2833                    return;
2834                }
2835                a.size_bytes as u64
2836            }
2837            Ok(None) => crate::files::MAX_FILE_SIZE,
2838            Err(e) => {
2839                warn!(%e, "get attachment for chunk");
2840                crate::files::MAX_FILE_SIZE
2841            }
2842        };
2843
2844        let result = self.file_manager.accept_chunk(
2845            &file_id,
2846            chunk_index,
2847            total_chunks,
2848            data,
2849            expected_size,
2850        );
2851        match result {
2852            Ok(None) => {
2853                // Move offered → downloading on first chunk.
2854                let _ = repo::update_attachment_status(
2855                    &self.db,
2856                    room_id,
2857                    &file_id,
2858                    AttachmentStatus::Downloading,
2859                    None,
2860                );
2861                // Best-effort progress event — we know we've processed
2862                // (chunk_index+1)/total_chunks chunks.
2863                let bytes_so_far = self
2864                    .file_manager
2865                    .progress(&file_id)
2866                    .map(|(b, _)| b)
2867                    .unwrap_or(0);
2868                let _ = self.app_event_tx.send(AppEvent::FileProgress {
2869                    file_id: file_id.clone(),
2870                    bytes_received: bytes_so_far,
2871                    total_bytes: expected_size,
2872                });
2873            }
2874            Ok(Some(completed)) => {
2875                let _ = repo::update_attachment_paths(
2876                    &self.db,
2877                    room_id,
2878                    &file_id,
2879                    Some(&completed.cache_path.to_string_lossy()),
2880                    None,
2881                );
2882                let _ = repo::update_attachment_status(
2883                    &self.db,
2884                    room_id,
2885                    &file_id,
2886                    AttachmentStatus::Ready,
2887                    None,
2888                );
2889                let _ = self.app_event_tx.send(AppEvent::FileReady {
2890                    file_id: file_id.clone(),
2891                });
2892            }
2893            Err(e) => {
2894                let msg = e.to_string();
2895                warn!(%msg, "chunk processing failed");
2896                let _ = repo::update_attachment_status(
2897                    &self.db,
2898                    room_id,
2899                    &file_id,
2900                    AttachmentStatus::Failed,
2901                    Some(&msg),
2902                );
2903                let _ = self.app_event_tx.send(AppEvent::FileFailed {
2904                    file_id: file_id.clone(),
2905                    reason: msg,
2906                });
2907            }
2908        }
2909    }
2910
2911    /// Emit MentionReceived if `body` contains either our full
2912    /// fingerprint or its short form (first hex group).
2913    fn maybe_emit_mention(&self, room_id: &str, body: &str) {
2914        let full = self.identity.fingerprint().to_lowercase();
2915        // First hex group, e.g. "a3b1" of "a3b1-c2d4-...".
2916        let short: &str = full.split('-').next().unwrap_or(&full);
2917        let lower = body.to_lowercase();
2918        // The full fingerprint anywhere counts; the short form counts only
2919        // as a standalone hex token, so it can't match an arbitrary
2920        // substring of an unrelated hash, URL, or word.
2921        let hit = lower.contains(full.as_str())
2922            || lower
2923                .split(|c: char| !c.is_ascii_hexdigit())
2924                .any(|tok| tok == short);
2925        if hit {
2926            let _ = self.app_event_tx.send(AppEvent::MentionReceived {
2927                room_id: room_id.to_string(),
2928                body: body.to_string(),
2929            });
2930        }
2931    }
2932
2933    fn decrypt_attachment(
2934        &self,
2935        room_id: &str,
2936        sender_fingerprint: &str,
2937        ciphertext: &[u8],
2938        meta: &EncryptedFileMeta,
2939    ) -> Result<Vec<u8>> {
2940        let mut rooms = self.active_rooms.lock().unwrap();
2941        let room = rooms
2942            .get_mut(room_id)
2943            .ok_or_else(|| HuddleError::Other("not in room".into()))?;
2944        let crypto = room
2945            .crypto
2946            .as_mut()
2947            .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
2948        file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
2949    }
2950}
2951
2952/// Use the platform's default opener on `path`.
2953fn open_with_system(path: &str) -> Result<()> {
2954    #[cfg(target_os = "macos")]
2955    let cmd = "open";
2956    #[cfg(target_os = "linux")]
2957    let cmd = "xdg-open";
2958    #[cfg(target_os = "windows")]
2959    let cmd = "cmd";
2960    #[cfg(target_os = "windows")]
2961    let args = vec!["/C", "start", "", path];
2962    #[cfg(not(target_os = "windows"))]
2963    let args = vec![path];
2964
2965    std::process::Command::new(cmd)
2966        .args(args)
2967        .spawn()
2968        .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
2969    Ok(())
2970}
2971
2972// Module-level salt cache: room_id -> salt. Populated when we receive
2973// announcements; queried by join_room.
2974static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
2975    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
2976
2977#[allow(dead_code)]
2978fn salt_len() -> usize {
2979    SALT_LEN
2980}
2981
2982fn now_unix() -> i64 {
2983    SystemTime::now()
2984        .duration_since(UNIX_EPOCH)
2985        .unwrap()
2986        .as_secs() as i64
2987}
2988
2989/// Phase B: generate a fresh 24-char base64-ish passphrase for the
2990/// rotation that follows a kick. Sourced from `OsRng` directly so the
2991/// kicker doesn't have to think up a strong one on the spot. Returned
2992/// to the owner via the kick-result modal for OOB sharing with the
2993/// remaining members.
2994fn generate_join_passphrase() -> String {
2995    use rand::RngCore;
2996    let mut bytes = [0u8; 16];
2997    rand::thread_rng().fill_bytes(&mut bytes);
2998    // Use URL-safe-no-pad so the user can read aloud / paste without
2999    // worrying about `=` padding or `+` getting URL-escaped.
3000    base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
3001}
3002
3003/// Phase F: short human-readable join code. 8 chars from a 32-symbol
3004/// alphabet (no easily-confused chars like 0/O/I/1) ≈ 40 bits — plenty
3005/// for a 10-minute online gate since the owner's client checks
3006/// exact-match (not brute-force-able offline).
3007fn generate_alphanumeric_code(len: usize) -> String {
3008    use rand::Rng;
3009    const ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
3010    let mut rng = rand::thread_rng();
3011    let mut out = String::with_capacity(len + 1);
3012    for i in 0..len {
3013        if i == 4 && len == 8 {
3014            out.push('-'); // pretty: XXXX-XXXX
3015        }
3016        let idx = rng.gen_range(0..ALPHABET.len());
3017        out.push(ALPHABET[idx] as char);
3018    }
3019    out
3020}
3021
3022#[cfg(test)]
3023mod parser_tests {
3024    use super::parse_dial_address;
3025
3026    #[test]
3027    fn parses_ipv4_port() {
3028        let m = parse_dial_address("10.3.72.53:9027").unwrap();
3029        assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
3030    }
3031
3032    #[test]
3033    fn parses_bracketed_ipv6() {
3034        let m = parse_dial_address("[::1]:9027").unwrap();
3035        assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
3036    }
3037
3038    #[test]
3039    fn rejects_unbracketed_ipv6() {
3040        let err = parse_dial_address("fe80::1:9027").unwrap_err();
3041        assert!(err.to_string().contains("brackets"));
3042    }
3043
3044    #[test]
3045    fn passes_through_raw_multiaddr() {
3046        let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
3047        assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
3048    }
3049
3050    #[test]
3051    fn empty_address_is_error() {
3052        assert!(parse_dial_address("   ").is_err());
3053    }
3054
3055    #[test]
3056    fn rejects_bad_port() {
3057        assert!(parse_dial_address("1.2.3.4:notaport").is_err());
3058    }
3059}