1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use libp2p::{Multiaddr, PeerId};
8use tokio::sync::broadcast;
9use tracing::{debug, error, info, warn};
10
11use crate::config;
12use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
13use crate::crypto::RoomCrypto;
14use crate::error::{HuddleError, Result};
15use crate::identity::Identity;
16use crate::network::events::NetworkEvent;
17use crate::network::protocol::{RoomAnnouncement, RoomMessage};
18use crate::network::{self, NetworkHandle, NetworkMode};
19use crate::storage::repo::{self, derive_room_id, KnownPeer, StoredRoom, StoredRoomMember};
20use crate::storage::{self, Db};
21
22pub use self::events::{AppEvent, DiscoveredRoom};
23
24#[derive(Debug, Clone)]
27pub struct KnownPeerStatus {
28 pub address: String,
29 pub label: Option<String>,
30 pub last_connected_at: Option<i64>,
31 pub connected_peer_id: Option<PeerId>,
32}
33
34pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
37 let trimmed = input.trim();
38 if trimmed.is_empty() {
39 return Err(HuddleError::Other("address is empty".into()));
40 }
41 if trimmed.starts_with('/') {
42 return trimmed
43 .parse::<Multiaddr>()
44 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
45 }
46 if let Some(rest) = trimmed.strip_prefix('[') {
47 let (host, port) = rest
48 .split_once("]:")
49 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
50 let port: u16 = port
51 .parse()
52 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
53 return format!("/ip6/{}/tcp/{}", host, port)
54 .parse::<Multiaddr>()
55 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
56 }
57 let (host, port) = trimmed
58 .rsplit_once(':')
59 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
60 if host.contains(':') {
61 return Err(HuddleError::Other(format!(
62 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
63 )));
64 }
65 let port: u16 = port
66 .parse()
67 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
68 format!("/ip4/{}/tcp/{}", host, port)
69 .parse::<Multiaddr>()
70 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
71}
72
73struct ActiveRoom {
75 info: StoredRoom,
76 crypto: Option<RoomCrypto>,
77 passphrase_key: Option<[u8; KEY_LEN]>,
80 members: HashSet<String>,
82}
83
84const DISCOVERED_TTL_SECS: i64 = 45;
87const ANNOUNCE_INTERVAL_SECS: u64 = 15;
88
89#[derive(Clone)]
90pub struct AppHandle {
91 identity: Arc<Identity>,
92 network: NetworkHandle,
93 mode: NetworkMode,
94 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
95 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
96 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
99 db: Db,
100 app_event_tx: broadcast::Sender<AppEvent>,
101}
102
103impl AppHandle {
104 pub async fn start() -> Result<Self> {
105 Self::start_with_options(NetworkMode::Mdns, 0).await
106 }
107
108 pub async fn start_with_options(mode: NetworkMode, port: u16) -> Result<Self> {
109 config::ensure_data_dir()?;
110 let db = storage::open_db(&config::db_path())?;
111 Self::start_with_db_and_options(db, mode, port).await
112 }
113
114 pub async fn start_with_db(db: Db) -> Result<Self> {
115 Self::start_with_db_and_options(db, NetworkMode::Mdns, 0).await
116 }
117
118 pub async fn start_with_db_and_options(
119 db: Db,
120 mode: NetworkMode,
121 port: u16,
122 ) -> Result<Self> {
123 let identity = Self::load_or_create_identity(&db)?;
124 let identity = Arc::new(identity);
125 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, "identity loaded");
126
127 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
128 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
129 let network = network::start_network_with(&identity, net_event_tx, mode, port)?;
130
131 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
132 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
133 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
134
135 let handle = Self {
136 identity,
137 network,
138 mode,
139 active_rooms,
140 discovered_rooms,
141 connected_dial_addrs,
142 db,
143 app_event_tx,
144 };
145
146 handle.spawn_event_processor(net_event_rx);
147 handle.spawn_announcement_ticker();
148 handle.spawn_discovered_room_pruner();
149 handle.spawn_known_peer_reconnector();
150
151 Ok(handle)
152 }
153
154 pub fn mode(&self) -> NetworkMode {
155 self.mode
156 }
157
158 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
159 self.app_event_tx.subscribe()
160 }
161
162 pub fn fingerprint(&self) -> &str {
163 self.identity.fingerprint()
164 }
165
166 pub fn peer_id(&self) -> PeerId {
167 self.identity.peer_id()
168 }
169
170 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
171 let map = self.discovered_rooms.lock().unwrap();
172 let mut v: Vec<DiscoveredRoom> = map.values().cloned().collect();
173 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
174 v
175 }
176
177 pub fn active_room_ids(&self) -> Vec<String> {
178 self.active_rooms.lock().unwrap().keys().cloned().collect()
179 }
180
181 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
182 self.active_rooms
183 .lock()
184 .unwrap()
185 .get(room_id)
186 .map(|r| r.info.clone())
187 }
188
189 pub fn room_members(&self, room_id: &str) -> Vec<String> {
190 self.active_rooms
191 .lock()
192 .unwrap()
193 .get(room_id)
194 .map(|r| {
195 let mut m: Vec<String> = r.members.iter().cloned().collect();
196 m.sort();
197 m
198 })
199 .unwrap_or_default()
200 }
201
202 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
203 repo::get_room_messages(&self.db, room_id, limit)
204 }
205
206 pub async fn start_room(
208 &self,
209 name: &str,
210 encrypted: bool,
211 passphrase: Option<&str>,
212 ) -> Result<String> {
213 if encrypted && passphrase.is_none() {
214 return Err(HuddleError::Other(
215 "encrypted room requires a passphrase".into(),
216 ));
217 }
218
219 let created_at = now_unix();
220 let creator_fp = self.identity.fingerprint().to_string();
221 let room_id = derive_room_id(&creator_fp, name, created_at);
222
223 let (passphrase_salt, passphrase_key) = if encrypted {
224 let salt = passphrase::random_salt();
225 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
226 (Some(salt.to_vec()), Some(key))
227 } else {
228 (None, None)
229 };
230
231 let info = StoredRoom {
232 id: room_id.clone(),
233 name: name.to_string(),
234 creator_fingerprint: creator_fp.clone(),
235 encrypted,
236 passphrase_salt: passphrase_salt.clone(),
237 created_at,
238 last_active: Some(created_at),
239 };
240 repo::insert_room(&self.db, &info)?;
241
242 let crypto = if encrypted {
243 Some(RoomCrypto::new_for_room(
244 self.db.clone(),
245 room_id.clone(),
246 creator_fp.clone(),
247 )?)
248 } else {
249 None
250 };
251
252 let mut members = HashSet::new();
253 members.insert(creator_fp.clone());
254
255 self.active_rooms.lock().unwrap().insert(
256 room_id.clone(),
257 ActiveRoom {
258 info: info.clone(),
259 crypto,
260 passphrase_key,
261 members,
262 },
263 );
264
265 self.network.subscribe_room(room_id.clone()).await;
266 self.announce_room_now(&info, 1).await;
267
268 let app = self.clone();
271 let rid = room_id.clone();
272 tokio::spawn(async move {
273 tokio::time::sleep(Duration::from_millis(500)).await;
274 if let Err(e) = app.broadcast_member_announce(&rid).await {
275 warn!(%e, "broadcast member announce");
276 }
277 });
278
279 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
280 room_id: room_id.clone(),
281 });
282
283 Ok(room_id)
284 }
285
286 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
288 let discovered = self
289 .discovered_rooms
290 .lock()
291 .unwrap()
292 .get(room_id)
293 .cloned()
294 .ok_or_else(|| HuddleError::Other(format!("room {room_id} not discovered")))?;
295
296 if discovered.encrypted && passphrase.is_none() {
297 return Err(HuddleError::Other(
298 "encrypted room requires a passphrase".into(),
299 ));
300 }
301
302 let salt_opt = self.get_room_salt(room_id);
306
307 let passphrase_key = if discovered.encrypted {
308 let salt = salt_opt
309 .clone()
310 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
311 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
312 } else {
313 None
314 };
315
316 let info = StoredRoom {
317 id: room_id.to_string(),
318 name: discovered.name.clone(),
319 creator_fingerprint: discovered.creator_fingerprint.clone(),
320 encrypted: discovered.encrypted,
321 passphrase_salt: salt_opt.clone(),
322 created_at: now_unix(),
323 last_active: Some(now_unix()),
324 };
325 repo::insert_room(&self.db, &info)?;
326
327 let crypto = if discovered.encrypted {
328 Some(RoomCrypto::new_for_room(
329 self.db.clone(),
330 room_id.to_string(),
331 self.identity.fingerprint().to_string(),
332 )?)
333 } else {
334 None
335 };
336
337 let mut members = HashSet::new();
338 members.insert(self.identity.fingerprint().to_string());
339
340 self.active_rooms.lock().unwrap().insert(
341 room_id.to_string(),
342 ActiveRoom {
343 info: info.clone(),
344 crypto,
345 passphrase_key,
346 members,
347 },
348 );
349
350 self.network.subscribe_room(room_id.to_string()).await;
351
352 let app = self.clone();
353 let rid = room_id.to_string();
354 tokio::spawn(async move {
355 tokio::time::sleep(Duration::from_millis(500)).await;
356 if let Err(e) = app.broadcast_member_announce(&rid).await {
357 warn!(%e, "broadcast member announce");
358 }
359 let req = RoomMessage::SessionKeyRequest {
361 requester_fingerprint: app.identity.fingerprint().to_string(),
362 };
363 if let Ok(bytes) = serde_json::to_vec(&req) {
364 app.network.publish_room_message(rid.clone(), bytes).await;
365 }
366 });
367
368 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
369 room_id: room_id.to_string(),
370 });
371
372 Ok(())
373 }
374
375 pub async fn leave_room(&self, room_id: &str) -> Result<()> {
376 let leave_msg = RoomMessage::MemberLeave {
378 sender_fingerprint: self.identity.fingerprint().to_string(),
379 };
380 if let Ok(bytes) = serde_json::to_vec(&leave_msg) {
381 self.network
382 .publish_room_message(room_id.to_string(), bytes)
383 .await;
384 }
385
386 self.active_rooms.lock().unwrap().remove(room_id);
387 self.network.unsubscribe_room(room_id.to_string()).await;
388
389 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
390 room_id: room_id.to_string(),
391 });
392 Ok(())
393 }
394
395 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
396 let our_fp = self.identity.fingerprint().to_string();
397 let msg = {
398 let mut rooms = self.active_rooms.lock().unwrap();
399 let room = rooms
400 .get_mut(room_id)
401 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
402
403 if room.info.encrypted {
404 let crypto = room
405 .crypto
406 .as_mut()
407 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
408 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
409 RoomMessage::Encrypted {
410 sender_fingerprint: our_fp.clone(),
411 session_id,
412 ciphertext_b64: base64::Engine::encode(
413 &base64::engine::general_purpose::STANDARD,
414 &ct_bytes,
415 ),
416 }
417 } else {
418 RoomMessage::Plain {
419 sender_fingerprint: our_fp.clone(),
420 body: body.to_string(),
421 }
422 }
423 };
424
425 let bytes = serde_json::to_vec(&msg)?;
426 self.network
427 .publish_room_message(room_id.to_string(), bytes)
428 .await;
429
430 let now = now_unix();
431 let msg_id =
432 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
433 repo::update_room_last_active(&self.db, room_id, now)?;
434
435 let _ = self.app_event_tx.send(AppEvent::MessageSent {
436 room_id: room_id.to_string(),
437 body: body.to_string(),
438 message_id: msg_id,
439 });
440
441 Ok(())
442 }
443
444 pub async fn shutdown(&self) {
445 self.network.shutdown().await;
446 }
447
448 pub async fn dial(&self, input: &str) -> Result<()> {
457 let multiaddr = parse_dial_address(input)?;
458 let canonical = multiaddr.to_string();
459 info!(%canonical, "dialing");
460
461 repo::upsert_known_peer(
462 &self.db,
463 &KnownPeer {
464 address: canonical.clone(),
465 label: None,
466 last_connected_at: None,
467 last_attempt_at: Some(now_unix()),
468 created_at: now_unix(),
469 },
470 )?;
471
472 let _ = self.app_event_tx.send(AppEvent::Dialing {
473 address: canonical.clone(),
474 });
475 self.network.dial(multiaddr).await;
476 Ok(())
477 }
478
479 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
480 let connected = self.connected_dial_addrs.lock().unwrap().clone();
481 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
482 stored
483 .into_iter()
484 .map(|p| {
485 let connected_peer = connected.get(&p.address).copied();
486 KnownPeerStatus {
487 address: p.address,
488 label: p.label,
489 last_connected_at: p.last_connected_at,
490 connected_peer_id: connected_peer,
491 }
492 })
493 .collect()
494 }
495
496 pub async fn forget_peer(&self, address: &str) -> Result<()> {
497 repo::forget_known_peer(&self.db, address)?;
498 self.connected_dial_addrs.lock().unwrap().remove(address);
499 Ok(())
500 }
501
502 pub async fn redial(&self, address: &str) -> Result<()> {
504 self.dial(address).await
505 }
506
507 fn spawn_known_peer_reconnector(&self) {
508 let handle = self.clone();
509 tokio::spawn(async move {
510 tokio::time::sleep(Duration::from_millis(500)).await;
512 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
513 for peer in known {
514 if let Err(e) = handle.dial(&peer.address).await {
515 debug!(%e, addr = %peer.address, "auto-reconnect failed");
516 }
517 }
518 });
519 }
520
521 fn load_or_create_identity(db: &Db) -> Result<Identity> {
526 if let Some(stored) = repo::load_identity(db)? {
527 let mut bytes = [0u8; 32];
528 bytes.copy_from_slice(&stored.ed25519_secret);
529 Identity::from_secret_bytes(bytes)
530 } else {
531 let id = Identity::generate()?;
532 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
533 Ok(id)
534 }
535 }
536
537 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
538 self.active_rooms
539 .lock()
540 .unwrap()
541 .get(room_id)
542 .and_then(|r| r.info.passphrase_salt.clone())
543 .or_else(|| {
544 ROOM_SALT_CACHE
546 .lock()
547 .unwrap()
548 .get(room_id)
549 .cloned()
550 })
551 }
552
553 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
554 let ann = RoomAnnouncement {
555 room_id: info.id.clone(),
556 name: info.name.clone(),
557 encrypted: info.encrypted,
558 passphrase_salt: info.passphrase_salt.clone(),
559 member_count,
560 creator_fingerprint: info.creator_fingerprint.clone(),
561 announced_at: now_unix(),
562 };
563 self.network.announce_room(ann).await;
564 }
565
566 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
567 let our_fp = self.identity.fingerprint().to_string();
568 let wrapped = {
569 let mut rooms = self.active_rooms.lock().unwrap();
570 let room = rooms
571 .get_mut(room_id)
572 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
573 if room.info.encrypted {
574 let crypto = room.crypto.as_mut().unwrap();
575 let session_key = crypto.our_session_key_b64();
576 let passphrase_key = room
577 .passphrase_key
578 .as_ref()
579 .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
580 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
581 } else {
582 None
583 }
584 };
585 let msg = RoomMessage::MemberAnnounce {
586 sender_fingerprint: our_fp,
587 wrapped_session_key: wrapped,
588 };
589 let bytes = serde_json::to_vec(&msg)?;
590 self.network
591 .publish_room_message(room_id.to_string(), bytes)
592 .await;
593 Ok(())
594 }
595
596 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
597 let handle = self.clone();
598 tokio::spawn(async move {
599 while let Some(event) = net_rx.recv().await {
600 handle.process_network_event(event).await;
601 }
602 info!("event processor stopped");
603 });
604 }
605
606 fn spawn_announcement_ticker(&self) {
607 let handle = self.clone();
608 tokio::spawn(async move {
609 let mut interval =
610 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
611 interval.tick().await; loop {
613 interval.tick().await;
614 let snapshot: Vec<(StoredRoom, u32)> = {
615 let active = handle.active_rooms.lock().unwrap();
616 active
617 .values()
618 .map(|r| (r.info.clone(), r.members.len() as u32))
619 .collect()
620 };
621 for (info, member_count) in snapshot {
622 handle.announce_room_now(&info, member_count).await;
623 }
624 }
625 });
626 }
627
628 fn spawn_discovered_room_pruner(&self) {
629 let handle = self.clone();
630 tokio::spawn(async move {
631 let mut interval = tokio::time::interval(Duration::from_secs(10));
632 interval.tick().await;
633 loop {
634 interval.tick().await;
635 let now = now_unix();
636 let mut to_drop = Vec::new();
637 {
638 let mut map = handle.discovered_rooms.lock().unwrap();
639 map.retain(|id, r| {
640 if now - r.last_seen > DISCOVERED_TTL_SECS {
641 to_drop.push(id.clone());
642 false
643 } else {
644 true
645 }
646 });
647 }
648 for id in to_drop {
649 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
650 }
651 }
652 });
653 }
654
655 async fn process_network_event(&self, event: NetworkEvent) {
656 match event {
657 NetworkEvent::PeerDiscovered { peer_id } => {
658 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
659 }
660 NetworkEvent::PeerExpired { .. } => {}
661 NetworkEvent::ListeningOn { address } => {
662 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
663 address: address.to_string(),
664 });
665 }
666 NetworkEvent::RoomAnnouncementReceived(ann) => {
667 let our_fp = self.identity.fingerprint();
668 if let Some(salt) = &ann.passphrase_salt {
670 ROOM_SALT_CACHE
671 .lock()
672 .unwrap()
673 .insert(ann.room_id.clone(), salt.clone());
674 }
675 let discovered = DiscoveredRoom {
676 room_id: ann.room_id.clone(),
677 name: ann.name.clone(),
678 encrypted: ann.encrypted,
679 member_count: ann.member_count,
680 creator_fingerprint: ann.creator_fingerprint.clone(),
681 last_seen: now_unix(),
682 };
683 if ann.creator_fingerprint == our_fp
685 && self.active_rooms.lock().unwrap().contains_key(&ann.room_id)
686 {
687 self.discovered_rooms
689 .lock()
690 .unwrap()
691 .insert(ann.room_id.clone(), discovered);
692 return;
693 }
694 self.discovered_rooms
695 .lock()
696 .unwrap()
697 .insert(ann.room_id.clone(), discovered.clone());
698 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
699 }
700 NetworkEvent::RoomMessageReceived {
701 room_id,
702 payload,
703 from_peer: _,
704 } => {
705 let msg: RoomMessage = match serde_json::from_slice(&payload) {
706 Ok(m) => m,
707 Err(e) => {
708 warn!(%e, "bad room message");
709 return;
710 }
711 };
712 self.handle_room_message(&room_id, msg).await;
713 }
714 NetworkEvent::DialSucceeded { peer_id, address } => {
715 let addr_s = address.to_string();
716 self.connected_dial_addrs
717 .lock()
718 .unwrap()
719 .insert(addr_s.clone(), peer_id);
720 let _ = repo::upsert_known_peer(
721 &self.db,
722 &KnownPeer {
723 address: addr_s.clone(),
724 label: None,
725 last_connected_at: Some(now_unix()),
726 last_attempt_at: Some(now_unix()),
727 created_at: now_unix(),
728 },
729 );
730 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
731 address: addr_s,
732 peer_id,
733 });
734 }
735 NetworkEvent::DialFailed { address, error } => {
736 let addr_s = address.to_string();
737 let _ = self.app_event_tx.send(AppEvent::DialFailed {
738 address: addr_s,
739 error,
740 });
741 }
742 }
743 }
744
745 async fn handle_room_message(&self, room_id: &str, msg: RoomMessage) {
746 let our_fp = self.identity.fingerprint().to_string();
747 match msg {
748 RoomMessage::MemberAnnounce {
749 sender_fingerprint,
750 wrapped_session_key,
751 } => {
752 if sender_fingerprint == our_fp {
753 return;
754 }
755 let need_inbound = {
756 let mut rooms = self.active_rooms.lock().unwrap();
757 let room = match rooms.get_mut(room_id) {
758 Some(r) => r,
759 None => return,
760 };
761 let newly_added = room.members.insert(sender_fingerprint.clone());
762 if newly_added {
763 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
764 room_id: room_id.to_string(),
765 fingerprint: sender_fingerprint.clone(),
766 });
767 }
768 let _ = repo::upsert_room_member(
770 &self.db,
771 &StoredRoomMember {
772 room_id: room_id.to_string(),
773 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
775 last_seen: Some(now_unix()),
776 },
777 );
778 room.info.encrypted && wrapped_session_key.is_some()
779 };
780
781 if need_inbound {
782 let wrapped = wrapped_session_key.unwrap();
783 let result = {
784 let mut rooms = self.active_rooms.lock().unwrap();
785 let room = rooms.get_mut(room_id).unwrap();
786 let passphrase_key = match &room.passphrase_key {
787 Some(k) => k,
788 None => {
789 warn!("no passphrase key when receiving session key");
790 return;
791 }
792 };
793 match passphrase::unwrap(&wrapped, passphrase_key) {
794 Ok(plain) => match String::from_utf8(plain) {
795 Ok(key_b64) => {
796 let crypto = room.crypto.as_mut().unwrap();
797 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
798 }
799 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
800 },
801 Err(e) => Err(e),
802 }
803 };
804 if let Err(e) = result {
805 error!(%e, "add inbound session failed");
806 }
807 }
808 }
809 RoomMessage::SessionKeyRequest {
810 requester_fingerprint,
811 } => {
812 if requester_fingerprint == our_fp {
813 return;
814 }
815 if let Err(e) = self.broadcast_member_announce(room_id).await {
817 warn!(%e, "broadcast member announce on request");
818 }
819 }
820 RoomMessage::Encrypted {
821 sender_fingerprint,
822 session_id,
823 ciphertext_b64,
824 } => {
825 if sender_fingerprint == our_fp {
826 return;
827 }
828 let ct_bytes = match base64::Engine::decode(
829 &base64::engine::general_purpose::STANDARD,
830 &ciphertext_b64,
831 ) {
832 Ok(b) => b,
833 Err(e) => {
834 warn!(%e, "bad base64 ciphertext");
835 return;
836 }
837 };
838 let plaintext = {
839 let mut rooms = self.active_rooms.lock().unwrap();
840 let room = match rooms.get_mut(room_id) {
841 Some(r) => r,
842 None => return,
843 };
844 let crypto = match room.crypto.as_mut() {
845 Some(c) => c,
846 None => return,
847 };
848 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
849 };
850 match plaintext {
851 Ok(pt) => {
852 let body = String::from_utf8_lossy(&pt).to_string();
853 let sent_at = now_unix();
854 let _ = repo::insert_room_message(
855 &self.db,
856 room_id,
857 &sender_fingerprint,
858 "in",
859 &body,
860 sent_at,
861 );
862 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
863 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
864 room_id: room_id.to_string(),
865 sender_fingerprint,
866 body,
867 sent_at,
868 });
869 }
870 Err(e) => {
871 debug!(%e, "decrypt failed (probably missing session key)");
872 }
873 }
874 }
875 RoomMessage::Plain {
876 sender_fingerprint,
877 body,
878 } => {
879 if sender_fingerprint == our_fp {
880 return;
881 }
882 let sent_at = now_unix();
883 let _ = repo::insert_room_message(
884 &self.db,
885 room_id,
886 &sender_fingerprint,
887 "in",
888 &body,
889 sent_at,
890 );
891 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
892 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
893 room_id: room_id.to_string(),
894 sender_fingerprint,
895 body,
896 sent_at,
897 });
898 }
899 RoomMessage::MemberLeave { sender_fingerprint } => {
900 if sender_fingerprint == our_fp {
901 return;
902 }
903 let removed = {
904 let mut rooms = self.active_rooms.lock().unwrap();
905 if let Some(room) = rooms.get_mut(room_id) {
906 room.members.remove(&sender_fingerprint)
907 } else {
908 false
909 }
910 };
911 if removed {
912 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
913 room_id: room_id.to_string(),
914 fingerprint: sender_fingerprint,
915 });
916 }
917 }
918 }
919 }
920}
921
922static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
925 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
926
927#[allow(dead_code)]
928fn salt_len() -> usize {
929 SALT_LEN
930}
931
932fn now_unix() -> i64 {
933 SystemTime::now()
934 .duration_since(UNIX_EPOCH)
935 .unwrap()
936 .as_secs() as i64
937}
938
939#[cfg(test)]
940mod parser_tests {
941 use super::parse_dial_address;
942
943 #[test]
944 fn parses_ipv4_port() {
945 let m = parse_dial_address("10.3.72.53:9027").unwrap();
946 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
947 }
948
949 #[test]
950 fn parses_bracketed_ipv6() {
951 let m = parse_dial_address("[::1]:9027").unwrap();
952 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
953 }
954
955 #[test]
956 fn rejects_unbracketed_ipv6() {
957 let err = parse_dial_address("fe80::1:9027").unwrap_err();
958 assert!(err.to_string().contains("brackets"));
959 }
960
961 #[test]
962 fn passes_through_raw_multiaddr() {
963 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
964 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
965 }
966
967 #[test]
968 fn empty_address_is_error() {
969 assert!(parse_dial_address(" ").is_err());
970 }
971
972 #[test]
973 fn rejects_bad_port() {
974 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
975 }
976}