Skip to main content

huddle_core/app/
mod.rs

1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use libp2p::{Multiaddr, PeerId};
8use tokio::sync::broadcast;
9use tracing::{debug, error, info, warn};
10
11use crate::config;
12use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
13use crate::crypto::RoomCrypto;
14use crate::error::{HuddleError, Result};
15use crate::identity::Identity;
16use crate::network::events::NetworkEvent;
17use crate::network::protocol::{RoomAnnouncement, RoomMessage};
18use crate::network::{self, NetworkHandle, NetworkMode};
19use crate::storage::repo::{self, derive_room_id, KnownPeer, StoredRoom, StoredRoomMember};
20use crate::storage::{self, Db};
21
22pub use self::events::{AppEvent, DiscoveredRoom};
23
24/// Lobby-facing view of a known dial peer: persisted address plus
25/// runtime "is the connection currently up?" status.
26#[derive(Debug, Clone)]
27pub struct KnownPeerStatus {
28    pub address: String,
29    pub label: Option<String>,
30    pub last_connected_at: Option<i64>,
31    pub connected_peer_id: Option<PeerId>,
32}
33
34/// Parse a user-entered dial address into a libp2p `Multiaddr`.
35/// Accepts `ip:port`, `[ipv6]:port`, or a raw multiaddr starting with `/`.
36pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
37    let trimmed = input.trim();
38    if trimmed.is_empty() {
39        return Err(HuddleError::Other("address is empty".into()));
40    }
41    if trimmed.starts_with('/') {
42        return trimmed
43            .parse::<Multiaddr>()
44            .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
45    }
46    if let Some(rest) = trimmed.strip_prefix('[') {
47        let (host, port) = rest
48            .split_once("]:")
49            .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
50        let port: u16 = port
51            .parse()
52            .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
53        return format!("/ip6/{}/tcp/{}", host, port)
54            .parse::<Multiaddr>()
55            .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
56    }
57    let (host, port) = trimmed
58        .rsplit_once(':')
59        .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
60    if host.contains(':') {
61        return Err(HuddleError::Other(format!(
62            "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
63        )));
64    }
65    let port: u16 = port
66        .parse()
67        .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
68    format!("/ip4/{}/tcp/{}", host, port)
69        .parse::<Multiaddr>()
70        .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
71}
72
73/// State for a room we've created or joined this session.
74struct ActiveRoom {
75    info: StoredRoom,
76    crypto: Option<RoomCrypto>,
77    /// Argon2id-derived 32-byte key for unwrapping incoming session keys.
78    /// None for unencrypted rooms.
79    passphrase_key: Option<[u8; KEY_LEN]>,
80    /// Fingerprints of members currently known to be in the room.
81    members: HashSet<String>,
82}
83
84/// TTL for a discovered room before it's considered stale (re-announcements
85/// happen every 15 seconds; after 45s of silence we drop it).
86const DISCOVERED_TTL_SECS: i64 = 45;
87const ANNOUNCE_INTERVAL_SECS: u64 = 15;
88
89#[derive(Clone)]
90pub struct AppHandle {
91    identity: Arc<Identity>,
92    network: NetworkHandle,
93    mode: NetworkMode,
94    active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
95    discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
96    /// Peer addresses we've dialed in this process; tracks "is the
97    /// connection currently up" for known peers shown in the lobby.
98    connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
99    db: Db,
100    app_event_tx: broadcast::Sender<AppEvent>,
101}
102
103impl AppHandle {
104    pub async fn start() -> Result<Self> {
105        Self::start_with_options(NetworkMode::Mdns, 0).await
106    }
107
108    pub async fn start_with_options(mode: NetworkMode, port: u16) -> Result<Self> {
109        config::ensure_data_dir()?;
110        let db = storage::open_db(&config::db_path())?;
111        Self::start_with_db_and_options(db, mode, port).await
112    }
113
114    pub async fn start_with_db(db: Db) -> Result<Self> {
115        Self::start_with_db_and_options(db, NetworkMode::Mdns, 0).await
116    }
117
118    pub async fn start_with_db_and_options(
119        db: Db,
120        mode: NetworkMode,
121        port: u16,
122    ) -> Result<Self> {
123        let identity = Self::load_or_create_identity(&db)?;
124        let identity = Arc::new(identity);
125        info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, "identity loaded");
126
127        let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
128        let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
129        let network = network::start_network_with(&identity, net_event_tx, mode, port)?;
130
131        let active_rooms = Arc::new(Mutex::new(HashMap::new()));
132        let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
133        let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
134
135        let handle = Self {
136            identity,
137            network,
138            mode,
139            active_rooms,
140            discovered_rooms,
141            connected_dial_addrs,
142            db,
143            app_event_tx,
144        };
145
146        handle.spawn_event_processor(net_event_rx);
147        handle.spawn_announcement_ticker();
148        handle.spawn_discovered_room_pruner();
149        handle.spawn_known_peer_reconnector();
150
151        Ok(handle)
152    }
153
154    pub fn mode(&self) -> NetworkMode {
155        self.mode
156    }
157
158    pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
159        self.app_event_tx.subscribe()
160    }
161
162    pub fn fingerprint(&self) -> &str {
163        self.identity.fingerprint()
164    }
165
166    pub fn peer_id(&self) -> PeerId {
167        self.identity.peer_id()
168    }
169
170    pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
171        let map = self.discovered_rooms.lock().unwrap();
172        let mut v: Vec<DiscoveredRoom> = map.values().cloned().collect();
173        v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
174        v
175    }
176
177    pub fn active_room_ids(&self) -> Vec<String> {
178        self.active_rooms.lock().unwrap().keys().cloned().collect()
179    }
180
181    pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
182        self.active_rooms
183            .lock()
184            .unwrap()
185            .get(room_id)
186            .map(|r| r.info.clone())
187    }
188
189    pub fn room_members(&self, room_id: &str) -> Vec<String> {
190        self.active_rooms
191            .lock()
192            .unwrap()
193            .get(room_id)
194            .map(|r| {
195                let mut m: Vec<String> = r.members.iter().cloned().collect();
196                m.sort();
197                m
198            })
199            .unwrap_or_default()
200    }
201
202    pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
203        repo::get_room_messages(&self.db, room_id, limit)
204    }
205
206    /// Create a new room. Returns its room_id.
207    pub async fn start_room(
208        &self,
209        name: &str,
210        encrypted: bool,
211        passphrase: Option<&str>,
212    ) -> Result<String> {
213        if encrypted && passphrase.is_none() {
214            return Err(HuddleError::Other(
215                "encrypted room requires a passphrase".into(),
216            ));
217        }
218
219        let created_at = now_unix();
220        let creator_fp = self.identity.fingerprint().to_string();
221        let room_id = derive_room_id(&creator_fp, name, created_at);
222
223        let (passphrase_salt, passphrase_key) = if encrypted {
224            let salt = passphrase::random_salt();
225            let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
226            (Some(salt.to_vec()), Some(key))
227        } else {
228            (None, None)
229        };
230
231        let info = StoredRoom {
232            id: room_id.clone(),
233            name: name.to_string(),
234            creator_fingerprint: creator_fp.clone(),
235            encrypted,
236            passphrase_salt: passphrase_salt.clone(),
237            created_at,
238            last_active: Some(created_at),
239        };
240        repo::insert_room(&self.db, &info)?;
241
242        let crypto = if encrypted {
243            Some(RoomCrypto::new_for_room(
244                self.db.clone(),
245                room_id.clone(),
246                creator_fp.clone(),
247            )?)
248        } else {
249            None
250        };
251
252        let mut members = HashSet::new();
253        members.insert(creator_fp.clone());
254
255        self.active_rooms.lock().unwrap().insert(
256            room_id.clone(),
257            ActiveRoom {
258                info: info.clone(),
259                crypto,
260                passphrase_key,
261                members,
262            },
263        );
264
265        self.network.subscribe_room(room_id.clone()).await;
266        self.announce_room_now(&info, 1).await;
267
268        // Broadcast our presence in the room (with our wrapped session key
269        // if encrypted). Use a small delay so the subscription propagates.
270        let app = self.clone();
271        let rid = room_id.clone();
272        tokio::spawn(async move {
273            tokio::time::sleep(Duration::from_millis(500)).await;
274            if let Err(e) = app.broadcast_member_announce(&rid).await {
275                warn!(%e, "broadcast member announce");
276            }
277        });
278
279        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
280            room_id: room_id.clone(),
281        });
282
283        Ok(room_id)
284    }
285
286    /// Join an existing discovered room.
287    pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
288        let discovered = self
289            .discovered_rooms
290            .lock()
291            .unwrap()
292            .get(room_id)
293            .cloned()
294            .ok_or_else(|| HuddleError::Other(format!("room {room_id} not discovered")))?;
295
296        if discovered.encrypted && passphrase.is_none() {
297            return Err(HuddleError::Other(
298                "encrypted room requires a passphrase".into(),
299            ));
300        }
301
302        // Re-fetch the announcement so we get the salt.
303        // (discovered_rooms stores the DiscoveredRoom struct which doesn't carry salt;
304        // we cache the salt separately in announcement_salts.)
305        let salt_opt = self.get_room_salt(room_id);
306
307        let passphrase_key = if discovered.encrypted {
308            let salt = salt_opt
309                .clone()
310                .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
311            Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
312        } else {
313            None
314        };
315
316        let info = StoredRoom {
317            id: room_id.to_string(),
318            name: discovered.name.clone(),
319            creator_fingerprint: discovered.creator_fingerprint.clone(),
320            encrypted: discovered.encrypted,
321            passphrase_salt: salt_opt.clone(),
322            created_at: now_unix(),
323            last_active: Some(now_unix()),
324        };
325        repo::insert_room(&self.db, &info)?;
326
327        let crypto = if discovered.encrypted {
328            Some(RoomCrypto::new_for_room(
329                self.db.clone(),
330                room_id.to_string(),
331                self.identity.fingerprint().to_string(),
332            )?)
333        } else {
334            None
335        };
336
337        let mut members = HashSet::new();
338        members.insert(self.identity.fingerprint().to_string());
339
340        self.active_rooms.lock().unwrap().insert(
341            room_id.to_string(),
342            ActiveRoom {
343                info: info.clone(),
344                crypto,
345                passphrase_key,
346                members,
347            },
348        );
349
350        self.network.subscribe_room(room_id.to_string()).await;
351
352        let app = self.clone();
353        let rid = room_id.to_string();
354        tokio::spawn(async move {
355            tokio::time::sleep(Duration::from_millis(500)).await;
356            if let Err(e) = app.broadcast_member_announce(&rid).await {
357                warn!(%e, "broadcast member announce");
358            }
359            // Ask existing members for their session keys.
360            let req = RoomMessage::SessionKeyRequest {
361                requester_fingerprint: app.identity.fingerprint().to_string(),
362            };
363            if let Ok(bytes) = serde_json::to_vec(&req) {
364                app.network.publish_room_message(rid.clone(), bytes).await;
365            }
366        });
367
368        let _ = self.app_event_tx.send(AppEvent::RoomJoined {
369            room_id: room_id.to_string(),
370        });
371
372        Ok(())
373    }
374
375    pub async fn leave_room(&self, room_id: &str) -> Result<()> {
376        // Broadcast a leave message before unsubscribing.
377        let leave_msg = RoomMessage::MemberLeave {
378            sender_fingerprint: self.identity.fingerprint().to_string(),
379        };
380        if let Ok(bytes) = serde_json::to_vec(&leave_msg) {
381            self.network
382                .publish_room_message(room_id.to_string(), bytes)
383                .await;
384        }
385
386        self.active_rooms.lock().unwrap().remove(room_id);
387        self.network.unsubscribe_room(room_id.to_string()).await;
388
389        let _ = self.app_event_tx.send(AppEvent::RoomLeft {
390            room_id: room_id.to_string(),
391        });
392        Ok(())
393    }
394
395    pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
396        let our_fp = self.identity.fingerprint().to_string();
397        let msg = {
398            let mut rooms = self.active_rooms.lock().unwrap();
399            let room = rooms
400                .get_mut(room_id)
401                .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
402
403            if room.info.encrypted {
404                let crypto = room
405                    .crypto
406                    .as_mut()
407                    .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
408                let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
409                RoomMessage::Encrypted {
410                    sender_fingerprint: our_fp.clone(),
411                    session_id,
412                    ciphertext_b64: base64::Engine::encode(
413                        &base64::engine::general_purpose::STANDARD,
414                        &ct_bytes,
415                    ),
416                }
417            } else {
418                RoomMessage::Plain {
419                    sender_fingerprint: our_fp.clone(),
420                    body: body.to_string(),
421                }
422            }
423        };
424
425        let bytes = serde_json::to_vec(&msg)?;
426        self.network
427            .publish_room_message(room_id.to_string(), bytes)
428            .await;
429
430        let now = now_unix();
431        let msg_id =
432            repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
433        repo::update_room_last_active(&self.db, room_id, now)?;
434
435        let _ = self.app_event_tx.send(AppEvent::MessageSent {
436            room_id: room_id.to_string(),
437            body: body.to_string(),
438            message_id: msg_id,
439        });
440
441        Ok(())
442    }
443
444    pub async fn shutdown(&self) {
445        self.network.shutdown().await;
446    }
447
448    // -------------------------------------------------------------------
449    // Dial / known peers
450    // -------------------------------------------------------------------
451
452    /// Dial a peer by a user-entered address. Accepts:
453    /// - `1.2.3.4:9000`
454    /// - `[fe80::1]:9000`
455    /// - `/ip4/.../tcp/...[/p2p/<peer>]` (raw multiaddr)
456    pub async fn dial(&self, input: &str) -> Result<()> {
457        let multiaddr = parse_dial_address(input)?;
458        let canonical = multiaddr.to_string();
459        info!(%canonical, "dialing");
460
461        repo::upsert_known_peer(
462            &self.db,
463            &KnownPeer {
464                address: canonical.clone(),
465                label: None,
466                last_connected_at: None,
467                last_attempt_at: Some(now_unix()),
468                created_at: now_unix(),
469            },
470        )?;
471
472        let _ = self.app_event_tx.send(AppEvent::Dialing {
473            address: canonical.clone(),
474        });
475        self.network.dial(multiaddr).await;
476        Ok(())
477    }
478
479    pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
480        let connected = self.connected_dial_addrs.lock().unwrap().clone();
481        let stored = repo::list_known_peers(&self.db).unwrap_or_default();
482        stored
483            .into_iter()
484            .map(|p| {
485                let connected_peer = connected.get(&p.address).copied();
486                KnownPeerStatus {
487                    address: p.address,
488                    label: p.label,
489                    last_connected_at: p.last_connected_at,
490                    connected_peer_id: connected_peer,
491                }
492            })
493            .collect()
494    }
495
496    pub async fn forget_peer(&self, address: &str) -> Result<()> {
497        repo::forget_known_peer(&self.db, address)?;
498        self.connected_dial_addrs.lock().unwrap().remove(address);
499        Ok(())
500    }
501
502    /// Re-dial a stored address — used by the lobby's "reconnect" action.
503    pub async fn redial(&self, address: &str) -> Result<()> {
504        self.dial(address).await
505    }
506
507    fn spawn_known_peer_reconnector(&self) {
508        let handle = self.clone();
509        tokio::spawn(async move {
510            // Brief delay so listeners come up first.
511            tokio::time::sleep(Duration::from_millis(500)).await;
512            let known = repo::list_known_peers(&handle.db).unwrap_or_default();
513            for peer in known {
514                if let Err(e) = handle.dial(&peer.address).await {
515                    debug!(%e, addr = %peer.address, "auto-reconnect failed");
516                }
517            }
518        });
519    }
520
521    // -------------------------------------------------------------------
522    // Internal helpers
523    // -------------------------------------------------------------------
524
525    fn load_or_create_identity(db: &Db) -> Result<Identity> {
526        if let Some(stored) = repo::load_identity(db)? {
527            let mut bytes = [0u8; 32];
528            bytes.copy_from_slice(&stored.ed25519_secret);
529            Identity::from_secret_bytes(bytes)
530        } else {
531            let id = Identity::generate()?;
532            repo::save_identity(db, &id.secret_bytes(), now_unix())?;
533            Ok(id)
534        }
535    }
536
537    fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
538        self.active_rooms
539            .lock()
540            .unwrap()
541            .get(room_id)
542            .and_then(|r| r.info.passphrase_salt.clone())
543            .or_else(|| {
544                // Try the cached announcement salt
545                ROOM_SALT_CACHE
546                    .lock()
547                    .unwrap()
548                    .get(room_id)
549                    .cloned()
550            })
551    }
552
553    async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
554        let ann = RoomAnnouncement {
555            room_id: info.id.clone(),
556            name: info.name.clone(),
557            encrypted: info.encrypted,
558            passphrase_salt: info.passphrase_salt.clone(),
559            member_count,
560            creator_fingerprint: info.creator_fingerprint.clone(),
561            announced_at: now_unix(),
562        };
563        self.network.announce_room(ann).await;
564    }
565
566    async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
567        let our_fp = self.identity.fingerprint().to_string();
568        let wrapped = {
569            let mut rooms = self.active_rooms.lock().unwrap();
570            let room = rooms
571                .get_mut(room_id)
572                .ok_or_else(|| HuddleError::Other("not in room".into()))?;
573            if room.info.encrypted {
574                let crypto = room.crypto.as_mut().unwrap();
575                let session_key = crypto.our_session_key_b64();
576                let passphrase_key = room
577                    .passphrase_key
578                    .as_ref()
579                    .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
580                Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
581            } else {
582                None
583            }
584        };
585        let msg = RoomMessage::MemberAnnounce {
586            sender_fingerprint: our_fp,
587            wrapped_session_key: wrapped,
588        };
589        let bytes = serde_json::to_vec(&msg)?;
590        self.network
591            .publish_room_message(room_id.to_string(), bytes)
592            .await;
593        Ok(())
594    }
595
596    fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
597        let handle = self.clone();
598        tokio::spawn(async move {
599            while let Some(event) = net_rx.recv().await {
600                handle.process_network_event(event).await;
601            }
602            info!("event processor stopped");
603        });
604    }
605
606    fn spawn_announcement_ticker(&self) {
607        let handle = self.clone();
608        tokio::spawn(async move {
609            let mut interval =
610                tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
611            interval.tick().await; // skip the immediate tick
612            loop {
613                interval.tick().await;
614                let snapshot: Vec<(StoredRoom, u32)> = {
615                    let active = handle.active_rooms.lock().unwrap();
616                    active
617                        .values()
618                        .map(|r| (r.info.clone(), r.members.len() as u32))
619                        .collect()
620                };
621                for (info, member_count) in snapshot {
622                    handle.announce_room_now(&info, member_count).await;
623                }
624            }
625        });
626    }
627
628    fn spawn_discovered_room_pruner(&self) {
629        let handle = self.clone();
630        tokio::spawn(async move {
631            let mut interval = tokio::time::interval(Duration::from_secs(10));
632            interval.tick().await;
633            loop {
634                interval.tick().await;
635                let now = now_unix();
636                let mut to_drop = Vec::new();
637                {
638                    let mut map = handle.discovered_rooms.lock().unwrap();
639                    map.retain(|id, r| {
640                        if now - r.last_seen > DISCOVERED_TTL_SECS {
641                            to_drop.push(id.clone());
642                            false
643                        } else {
644                            true
645                        }
646                    });
647                }
648                for id in to_drop {
649                    let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
650                }
651            }
652        });
653    }
654
655    async fn process_network_event(&self, event: NetworkEvent) {
656        match event {
657            NetworkEvent::PeerDiscovered { peer_id } => {
658                let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
659            }
660            NetworkEvent::PeerExpired { .. } => {}
661            NetworkEvent::ListeningOn { address } => {
662                let _ = self.app_event_tx.send(AppEvent::ListeningOn {
663                    address: address.to_string(),
664                });
665            }
666            NetworkEvent::RoomAnnouncementReceived(ann) => {
667                let our_fp = self.identity.fingerprint();
668                // Cache the salt for join_room
669                if let Some(salt) = &ann.passphrase_salt {
670                    ROOM_SALT_CACHE
671                        .lock()
672                        .unwrap()
673                        .insert(ann.room_id.clone(), salt.clone());
674                }
675                let discovered = DiscoveredRoom {
676                    room_id: ann.room_id.clone(),
677                    name: ann.name.clone(),
678                    encrypted: ann.encrypted,
679                    member_count: ann.member_count,
680                    creator_fingerprint: ann.creator_fingerprint.clone(),
681                    last_seen: now_unix(),
682                };
683                // Skip our own announcements
684                if ann.creator_fingerprint == our_fp
685                    && self.active_rooms.lock().unwrap().contains_key(&ann.room_id)
686                {
687                    // It's our room — still cache it so others can join, but don't emit.
688                    self.discovered_rooms
689                        .lock()
690                        .unwrap()
691                        .insert(ann.room_id.clone(), discovered);
692                    return;
693                }
694                self.discovered_rooms
695                    .lock()
696                    .unwrap()
697                    .insert(ann.room_id.clone(), discovered.clone());
698                let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
699            }
700            NetworkEvent::RoomMessageReceived {
701                room_id,
702                payload,
703                from_peer: _,
704            } => {
705                let msg: RoomMessage = match serde_json::from_slice(&payload) {
706                    Ok(m) => m,
707                    Err(e) => {
708                        warn!(%e, "bad room message");
709                        return;
710                    }
711                };
712                self.handle_room_message(&room_id, msg).await;
713            }
714            NetworkEvent::DialSucceeded { peer_id, address } => {
715                let addr_s = address.to_string();
716                self.connected_dial_addrs
717                    .lock()
718                    .unwrap()
719                    .insert(addr_s.clone(), peer_id);
720                let _ = repo::upsert_known_peer(
721                    &self.db,
722                    &KnownPeer {
723                        address: addr_s.clone(),
724                        label: None,
725                        last_connected_at: Some(now_unix()),
726                        last_attempt_at: Some(now_unix()),
727                        created_at: now_unix(),
728                    },
729                );
730                let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
731                    address: addr_s,
732                    peer_id,
733                });
734            }
735            NetworkEvent::DialFailed { address, error } => {
736                let addr_s = address.to_string();
737                let _ = self.app_event_tx.send(AppEvent::DialFailed {
738                    address: addr_s,
739                    error,
740                });
741            }
742        }
743    }
744
745    async fn handle_room_message(&self, room_id: &str, msg: RoomMessage) {
746        let our_fp = self.identity.fingerprint().to_string();
747        match msg {
748            RoomMessage::MemberAnnounce {
749                sender_fingerprint,
750                wrapped_session_key,
751            } => {
752                if sender_fingerprint == our_fp {
753                    return;
754                }
755                let need_inbound = {
756                    let mut rooms = self.active_rooms.lock().unwrap();
757                    let room = match rooms.get_mut(room_id) {
758                        Some(r) => r,
759                        None => return,
760                    };
761                    let newly_added = room.members.insert(sender_fingerprint.clone());
762                    if newly_added {
763                        let _ = self.app_event_tx.send(AppEvent::MemberJoined {
764                            room_id: room_id.to_string(),
765                            fingerprint: sender_fingerprint.clone(),
766                        });
767                    }
768                    // Persist member
769                    let _ = repo::upsert_room_member(
770                        &self.db,
771                        &StoredRoomMember {
772                            room_id: room_id.to_string(),
773                            peer_id: String::new(), // unknown at this layer
774                            fingerprint: sender_fingerprint.clone(),
775                            last_seen: Some(now_unix()),
776                        },
777                    );
778                    room.info.encrypted && wrapped_session_key.is_some()
779                };
780
781                if need_inbound {
782                    let wrapped = wrapped_session_key.unwrap();
783                    let result = {
784                        let mut rooms = self.active_rooms.lock().unwrap();
785                        let room = rooms.get_mut(room_id).unwrap();
786                        let passphrase_key = match &room.passphrase_key {
787                            Some(k) => k,
788                            None => {
789                                warn!("no passphrase key when receiving session key");
790                                return;
791                            }
792                        };
793                        match passphrase::unwrap(&wrapped, passphrase_key) {
794                            Ok(plain) => match String::from_utf8(plain) {
795                                Ok(key_b64) => {
796                                    let crypto = room.crypto.as_mut().unwrap();
797                                    crypto.add_inbound_session(&sender_fingerprint, &key_b64)
798                                }
799                                Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
800                            },
801                            Err(e) => Err(e),
802                        }
803                    };
804                    if let Err(e) = result {
805                        error!(%e, "add inbound session failed");
806                    }
807                }
808            }
809            RoomMessage::SessionKeyRequest {
810                requester_fingerprint,
811            } => {
812                if requester_fingerprint == our_fp {
813                    return;
814                }
815                // Re-announce ourselves to share our session key with the new joiner.
816                if let Err(e) = self.broadcast_member_announce(room_id).await {
817                    warn!(%e, "broadcast member announce on request");
818                }
819            }
820            RoomMessage::Encrypted {
821                sender_fingerprint,
822                session_id,
823                ciphertext_b64,
824            } => {
825                if sender_fingerprint == our_fp {
826                    return;
827                }
828                let ct_bytes = match base64::Engine::decode(
829                    &base64::engine::general_purpose::STANDARD,
830                    &ciphertext_b64,
831                ) {
832                    Ok(b) => b,
833                    Err(e) => {
834                        warn!(%e, "bad base64 ciphertext");
835                        return;
836                    }
837                };
838                let plaintext = {
839                    let mut rooms = self.active_rooms.lock().unwrap();
840                    let room = match rooms.get_mut(room_id) {
841                        Some(r) => r,
842                        None => return,
843                    };
844                    let crypto = match room.crypto.as_mut() {
845                        Some(c) => c,
846                        None => return,
847                    };
848                    crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
849                };
850                match plaintext {
851                    Ok(pt) => {
852                        let body = String::from_utf8_lossy(&pt).to_string();
853                        let sent_at = now_unix();
854                        let _ = repo::insert_room_message(
855                            &self.db,
856                            room_id,
857                            &sender_fingerprint,
858                            "in",
859                            &body,
860                            sent_at,
861                        );
862                        let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
863                        let _ = self.app_event_tx.send(AppEvent::MessageReceived {
864                            room_id: room_id.to_string(),
865                            sender_fingerprint,
866                            body,
867                            sent_at,
868                        });
869                    }
870                    Err(e) => {
871                        debug!(%e, "decrypt failed (probably missing session key)");
872                    }
873                }
874            }
875            RoomMessage::Plain {
876                sender_fingerprint,
877                body,
878            } => {
879                if sender_fingerprint == our_fp {
880                    return;
881                }
882                let sent_at = now_unix();
883                let _ = repo::insert_room_message(
884                    &self.db,
885                    room_id,
886                    &sender_fingerprint,
887                    "in",
888                    &body,
889                    sent_at,
890                );
891                let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
892                let _ = self.app_event_tx.send(AppEvent::MessageReceived {
893                    room_id: room_id.to_string(),
894                    sender_fingerprint,
895                    body,
896                    sent_at,
897                });
898            }
899            RoomMessage::MemberLeave { sender_fingerprint } => {
900                if sender_fingerprint == our_fp {
901                    return;
902                }
903                let removed = {
904                    let mut rooms = self.active_rooms.lock().unwrap();
905                    if let Some(room) = rooms.get_mut(room_id) {
906                        room.members.remove(&sender_fingerprint)
907                    } else {
908                        false
909                    }
910                };
911                if removed {
912                    let _ = self.app_event_tx.send(AppEvent::MemberLeft {
913                        room_id: room_id.to_string(),
914                        fingerprint: sender_fingerprint,
915                    });
916                }
917            }
918        }
919    }
920}
921
922// Module-level salt cache: room_id -> salt. Populated when we receive
923// announcements; queried by join_room.
924static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
925    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
926
927#[allow(dead_code)]
928fn salt_len() -> usize {
929    SALT_LEN
930}
931
932fn now_unix() -> i64 {
933    SystemTime::now()
934        .duration_since(UNIX_EPOCH)
935        .unwrap()
936        .as_secs() as i64
937}
938
939#[cfg(test)]
940mod parser_tests {
941    use super::parse_dial_address;
942
943    #[test]
944    fn parses_ipv4_port() {
945        let m = parse_dial_address("10.3.72.53:9027").unwrap();
946        assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
947    }
948
949    #[test]
950    fn parses_bracketed_ipv6() {
951        let m = parse_dial_address("[::1]:9027").unwrap();
952        assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
953    }
954
955    #[test]
956    fn rejects_unbracketed_ipv6() {
957        let err = parse_dial_address("fe80::1:9027").unwrap_err();
958        assert!(err.to_string().contains("brackets"));
959    }
960
961    #[test]
962    fn passes_through_raw_multiaddr() {
963        let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
964        assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
965    }
966
967    #[test]
968    fn empty_address_is_error() {
969        assert!(parse_dial_address("   ").is_err());
970    }
971
972    #[test]
973    fn rejects_bad_port() {
974        assert!(parse_dial_address("1.2.3.4:notaport").is_err());
975    }
976}