1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::server::{ServerClient, ServerEvent};
23use crate::network::protocol::{encode_wire, RoomAnnouncement, RoomMessage, WireMessage};
24use crate::network::{self, NetworkHandle, NetworkMode};
25use crate::storage::repo::{
26 self, derive_room_id, AttachmentStatus, KnownPeer, RoomKind, StoredAttachment, StoredRoom,
27 StoredRoomMember,
28};
29use crate::storage::{self, Db};
30
31pub use self::events::{AppEvent, DiscoveredRoom};
32
33#[derive(Debug, Clone)]
36pub struct KnownPeerStatus {
37 pub address: String,
38 pub label: Option<String>,
39 pub last_connected_at: Option<i64>,
40 pub connected_peer_id: Option<PeerId>,
41 pub fingerprint: Option<String>,
45}
46
47pub fn canonical_dm_room_id(a: &str, b: &str) -> String {
58 use sha2::{Digest, Sha256};
59 let (lo, hi) = if a <= b { (a, b) } else { (b, a) };
60 let mut hasher = Sha256::new();
61 hasher.update(b"huddle-dm-v1\0");
62 hasher.update(lo.as_bytes());
63 hasher.update(b"\0");
64 hasher.update(hi.as_bytes());
65 hex::encode(&hasher.finalize()[..16])
66}
67
68pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
71 let trimmed = input.trim();
72 if trimmed.is_empty() {
73 return Err(HuddleError::Other("address is empty".into()));
74 }
75 if trimmed.starts_with('/') {
76 return trimmed
77 .parse::<Multiaddr>()
78 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
79 }
80 if let Some(rest) = trimmed.strip_prefix('[') {
81 let (host, port) = rest
82 .split_once("]:")
83 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
84 let port: u16 = port
85 .parse()
86 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
87 return format!("/ip6/{}/tcp/{}", host, port)
88 .parse::<Multiaddr>()
89 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
90 }
91 let (host, port) = trimmed
92 .rsplit_once(':')
93 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
94 if host.contains(':') {
95 return Err(HuddleError::Other(format!(
96 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
97 )));
98 }
99 let port: u16 = port
100 .parse()
101 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
102 format!("/ip4/{}/tcp/{}", host, port)
103 .parse::<Multiaddr>()
104 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
105}
106
107struct ActiveRoom {
109 info: StoredRoom,
110 crypto: Option<RoomCrypto>,
111 passphrase_key: Option<[u8; KEY_LEN]>,
114 members: HashSet<String>,
116 typers: HashMap<String, i64>,
119 read_only: bool,
126 issued_codes: Vec<(String, i64)>,
130}
131
132const TYPING_TTL_SECS: i64 = 3;
133
134const DISCOVERED_TTL_SECS: i64 = 45;
137const ANNOUNCE_INTERVAL_SECS: u64 = 15;
138
139struct SasFlow {
143 room_id: String,
144 partner_fingerprint: String,
145 our_secret: x25519_dalek::StaticSecret,
146 sas_code: Option<crate::crypto::sas::SasCode>,
148 our_confirmed: bool,
149 their_confirmed: bool,
150 finalized: bool,
156}
157
158pub const DEFAULT_SERVER_URL: &str =
163 "ws://huddleg2647kbrmngflqai23f4rrc7l5dnszz5lij76uhqzmkebx2mid.onion:80/ws";
164pub const DEFAULT_TOR_SOCKS: &str = "127.0.0.1:9050";
166
167#[derive(Clone)]
168pub struct AppHandle {
169 identity: Arc<Identity>,
170 network: NetworkHandle,
171 mode: NetworkMode,
172 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
173 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
174 restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
178 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
181 file_manager: Arc<FileManager>,
183 db: Db,
184 session_persist_key: [u8; 32],
188 sas_flows: Arc<Mutex<HashMap<String, SasFlow>>>,
191 pending_code_secrets:
198 Arc<Mutex<HashMap<(String, String), x25519_dalek::StaticSecret>>>,
199 pending_invite_dials: Arc<Mutex<HashMap<String, String>>>,
212 nat_reachable_addrs: Arc<Mutex<HashSet<String>>>,
218 relay_circuit_addrs: Arc<Mutex<HashSet<String>>>,
224 host_addr_dial_attempts: Arc<Mutex<HashMap<String, i64>>>,
229 last_profile_broadcast_at_ms: Arc<Mutex<HashMap<String, i64>>>,
236 pending_auto_dm_addrs: Arc<Mutex<HashSet<String>>>,
243 app_event_tx: broadcast::Sender<AppEvent>,
244 server_enabled: bool,
249}
250
251const HOST_ADDR_DIAL_BACKOFF_SECS: i64 = 300;
254
255const PROFILE_REBROADCAST_FLOOR_MS: i64 = 60_000;
259
260impl AppHandle {
261 pub async fn start() -> Result<Self> {
262 Self::start_with_options(NetworkMode::Server, 0, None, Vec::new(), None, None).await
263 }
264
265 pub fn peek_mdns_enabled(master_key: Option<&[u8; 32]>) -> Result<bool> {
273 config::ensure_data_dir()?;
274 let db = storage::open_db(&config::db_path(), master_key)?;
275 let v = repo::get_setting(&db, "mdns_enabled")?
276 .map(|s| s == "1")
277 .unwrap_or(true);
278 Ok(v)
279 }
280
281 pub async fn start_with_options(
282 mode: NetworkMode,
283 port: u16,
284 master_key: Option<&[u8; 32]>,
285 relays: Vec<Multiaddr>,
286 server_url: Option<String>,
287 tor_socks: Option<String>,
288 ) -> Result<Self> {
289 config::ensure_data_dir()?;
290 let session_persist_key = match master_key {
295 Some(mk) => storage::keychain::derive_subkey(mk, b"megolm-persist"),
296 None => [0u8; 32],
297 };
298 let db = storage::open_db(&config::db_path(), master_key)?;
299 Self::start_with_db_and_options(
300 db,
301 mode,
302 port,
303 session_persist_key,
304 relays,
305 server_url,
306 tor_socks,
307 )
308 .await
309 }
310
311 pub async fn start_with_db(db: Db) -> Result<Self> {
312 Self::start_with_db_and_options(
313 db,
314 NetworkMode::Mdns,
315 0,
316 [0u8; 32],
317 Vec::new(),
318 None,
319 None,
320 )
321 .await
322 }
323
324 pub async fn start_with_db_and_options(
325 db: Db,
326 mode: NetworkMode,
327 port: u16,
328 session_persist_key: [u8; 32],
329 relays: Vec<Multiaddr>,
330 server_url: Option<String>,
331 tor_socks: Option<String>,
332 ) -> Result<Self> {
333 let identity = Self::load_or_create_identity(&db)?;
334 let identity = Arc::new(identity);
335 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, relay_count = relays.len(), "identity loaded");
336
337 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
338 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
339 let network = if mode.uses_libp2p() {
346 network::start_network_with(&identity, net_event_tx, mode, port, relays)?
347 } else {
348 network::start_network_disabled()
349 };
350
351 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
352 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
353 let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
354 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
355 let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
356
357 let handle = Self {
358 identity,
359 network,
360 mode,
361 active_rooms,
362 discovered_rooms,
363 restorable_rooms,
364 connected_dial_addrs,
365 file_manager,
366 db,
367 session_persist_key,
368 sas_flows: Arc::new(Mutex::new(HashMap::new())),
369 pending_code_secrets: Arc::new(Mutex::new(HashMap::new())),
370 pending_invite_dials: Arc::new(Mutex::new(HashMap::new())),
371 nat_reachable_addrs: Arc::new(Mutex::new(HashSet::new())),
372 relay_circuit_addrs: Arc::new(Mutex::new(HashSet::new())),
373 host_addr_dial_attempts: Arc::new(Mutex::new(HashMap::new())),
374 last_profile_broadcast_at_ms: Arc::new(Mutex::new(HashMap::new())),
375 pending_auto_dm_addrs: Arc::new(Mutex::new(HashSet::new())),
376 app_event_tx,
377 server_enabled: server_url.is_some(),
378 };
379
380 handle.spawn_event_processor(net_event_rx);
381 handle.spawn_announcement_ticker();
382 handle.spawn_discovered_room_pruner();
383 handle.spawn_known_peer_reconnector();
384 handle.restore_rooms_from_db().await;
385 if let Some(url) = server_url {
391 handle.spawn_server_connection(url, tor_socks);
392 }
393 if let Err(e) = repo::cleanup_expired_pending_friend_requests(&handle.db, now_unix()) {
397 warn!(%e, "failed to sweep expired pending friend requests");
398 }
399
400 Ok(handle)
401 }
402
403 pub fn mode(&self) -> NetworkMode {
404 self.mode
405 }
406
407 pub fn server_connected(&self) -> bool {
410 self.network.has_server()
411 }
412
413 pub fn server_enabled(&self) -> bool {
419 self.server_enabled
420 }
421
422 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
423 self.app_event_tx.subscribe()
424 }
425
426 pub fn fingerprint(&self) -> &str {
427 self.identity.fingerprint()
428 }
429
430 pub fn peer_id(&self) -> PeerId {
431 self.identity.peer_id()
432 }
433
434 pub fn sign_invite(&self, invite: crate::invite::InviteLink) -> Result<crate::invite::InviteLink> {
440 crate::invite::sign_invite(&self.identity, invite)
441 }
442
443 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
444 let now = now_unix();
445 let our_fp = self.identity.fingerprint().to_string();
446 let mut by_id: HashMap<String, DiscoveredRoom> = self
447 .discovered_rooms
448 .lock()
449 .unwrap()
450 .clone();
451
452 for room in self.active_rooms.lock().unwrap().values() {
456 let entry = DiscoveredRoom {
457 room_id: room.info.id.clone(),
458 name: room.info.name.clone(),
459 encrypted: room.info.encrypted,
460 member_count: room.members.len() as u32,
461 creator_fingerprint: room.info.creator_fingerprint.clone(),
462 last_seen: now,
463 restorable: false,
464 host_addrs: Vec::new(),
465 kind: room.info.kind,
466 };
467 by_id
468 .entry(room.info.id.clone())
469 .and_modify(|d| {
470 d.last_seen = now;
471 if entry.member_count > d.member_count {
472 d.member_count = entry.member_count;
473 }
474 d.restorable = false;
475 d.kind = entry.kind;
476 })
477 .or_insert(entry);
478 }
479
480 for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
484 if by_id.contains_key(id) {
485 continue;
486 }
487 by_id.insert(
488 id.clone(),
489 DiscoveredRoom {
490 room_id: id.clone(),
491 name: stored.name.clone(),
492 encrypted: stored.encrypted,
493 member_count: 0,
494 creator_fingerprint: stored.creator_fingerprint.clone(),
495 last_seen: stored.last_active.unwrap_or(stored.created_at),
496 restorable: true,
497 host_addrs: Vec::new(),
498 kind: stored.kind,
499 },
500 );
501 }
502
503 by_id.retain(|room_id, d| {
511 if d.kind != RoomKind::Direct {
512 return true;
513 }
514 if self
517 .active_rooms
518 .lock()
519 .unwrap()
520 .contains_key(room_id)
521 {
522 return true;
523 }
524 canonical_dm_room_id(&our_fp, &d.creator_fingerprint) == *room_id
527 });
528
529 let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
530 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
531 v
532 }
533
534 pub fn dm_partner_fingerprint(&self, room_id: &str) -> Option<String> {
539 let our_fp = self.identity.fingerprint().to_string();
540 let rooms = self.active_rooms.lock().unwrap();
541 let room = rooms.get(room_id)?;
542 if room.info.kind != RoomKind::Direct {
543 return None;
544 }
545 room.members
546 .iter()
547 .find(|m| **m != our_fp)
548 .cloned()
549 }
550
551 pub fn active_room_ids(&self) -> Vec<String> {
552 self.active_rooms.lock().unwrap().keys().cloned().collect()
553 }
554
555 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
556 self.active_rooms
557 .lock()
558 .unwrap()
559 .get(room_id)
560 .map(|r| r.info.clone())
561 }
562
563 pub fn room_members(&self, room_id: &str) -> Vec<String> {
564 self.active_rooms
565 .lock()
566 .unwrap()
567 .get(room_id)
568 .map(|r| {
569 let mut m: Vec<String> = r.members.iter().cloned().collect();
570 m.sort();
571 m
572 })
573 .unwrap_or_default()
574 }
575
576 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
577 repo::get_room_messages(&self.db, room_id, limit)
578 }
579
580 pub fn search_room_messages(
581 &self,
582 room_id: &str,
583 query: &str,
584 limit: i64,
585 ) -> Result<Vec<repo::StoredRoomMessage>> {
586 repo::search_room_messages(&self.db, room_id, query, limit)
587 }
588
589 pub async fn start_room(
597 &self,
598 name: &str,
599 encrypted: bool,
600 passphrase: Option<&str>,
601 kind: RoomKind,
602 ) -> Result<String> {
603 if encrypted && passphrase.is_none() {
604 return Err(HuddleError::Other(
605 "encrypted room requires a passphrase".into(),
606 ));
607 }
608
609 let created_at = now_unix();
610 let creator_fp = self.identity.fingerprint().to_string();
611 let room_id = derive_room_id(&creator_fp, name, created_at);
612
613 let (passphrase_salt, passphrase_key) = if encrypted {
614 let salt = passphrase::random_salt();
615 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
616 (Some(salt.to_vec()), Some(key))
617 } else {
618 (None, None)
619 };
620
621 let info = StoredRoom {
622 id: room_id.clone(),
623 name: name.to_string(),
624 creator_fingerprint: creator_fp.clone(),
625 encrypted,
626 passphrase_salt: passphrase_salt.clone(),
627 created_at,
628 last_active: Some(created_at),
629 kind,
630 };
631 repo::insert_room(&self.db, &info)?;
632
633 let crypto = if encrypted {
634 Some(RoomCrypto::new_for_room(
635 self.db.clone(),
636 room_id.clone(),
637 creator_fp.clone(),
638 self.session_persist_key,
639 )?)
640 } else {
641 None
642 };
643
644 let mut members = HashSet::new();
645 members.insert(creator_fp.clone());
646
647 repo::upsert_room_member(
651 &self.db,
652 &StoredRoomMember {
653 room_id: room_id.clone(),
654 peer_id: String::new(),
655 fingerprint: creator_fp.clone(),
656 last_seen: Some(created_at),
657 verified: true, ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
659 role: "owner".into(),
660 },
661 )?;
662
663 self.active_rooms.lock().unwrap().insert(
664 room_id.clone(),
665 ActiveRoom {
666 info: info.clone(),
667 crypto,
668 passphrase_key,
669 members,
670 typers: HashMap::new(),
671 read_only: false,
672 issued_codes: Vec::new(),
673 },
674 );
675
676 self.network.subscribe_room(room_id.clone()).await;
677 self.announce_room_now(&info, 1).await;
678
679 let app = self.clone();
682 let rid = room_id.clone();
683 tokio::spawn(async move {
684 tokio::time::sleep(Duration::from_millis(500)).await;
685 if let Err(e) = app.broadcast_member_announce(&rid).await {
686 warn!(%e, "broadcast member announce");
687 }
688 });
689
690 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
691 room_id: room_id.clone(),
692 });
693
694 Ok(room_id)
695 }
696
697 pub async fn start_direct(&self, partner_fingerprint: &str) -> Result<String> {
721 let our_fp = self.identity.fingerprint().to_string();
722 if partner_fingerprint == our_fp {
723 return Err(HuddleError::Other("cannot DM yourself".into()));
724 }
725 let room_id = canonical_dm_room_id(&our_fp, partner_fingerprint);
726
727 if self.active_rooms.lock().unwrap().contains_key(&room_id) {
732 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
733 room_id: room_id.clone(),
734 });
735 return Ok(room_id);
736 }
737 if repo::get_room(&self.db, &room_id)?.is_some() {
738 return self.bootstrap_direct_room(&room_id, partner_fingerprint).await;
740 }
741
742 let created_at = now_unix();
743 let name = format!("dm-{}", short_fp_for_msg(partner_fingerprint));
747
748 let dm_salt = hex::decode(&room_id).unwrap_or_else(|_| room_id.as_bytes().to_vec());
755 let info = StoredRoom {
756 id: room_id.clone(),
757 name,
758 creator_fingerprint: our_fp.clone(),
759 encrypted: true,
760 passphrase_salt: Some(dm_salt),
761 created_at,
762 last_active: Some(created_at),
763 kind: RoomKind::Direct,
764 };
765 repo::insert_room(&self.db, &info)?;
766
767 let mut members = HashSet::new();
768 members.insert(our_fp.clone());
769 repo::upsert_room_member(
770 &self.db,
771 &StoredRoomMember {
772 room_id: room_id.clone(),
773 peer_id: String::new(),
774 fingerprint: our_fp.clone(),
775 last_seen: Some(created_at),
776 verified: true,
777 ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
778 role: "member".into(),
779 },
780 )?;
781
782 let passphrase_key = self.try_derive_dm_key(&room_id, partner_fingerprint);
789
790 let crypto = Some(RoomCrypto::new_for_room(
795 self.db.clone(),
796 room_id.clone(),
797 our_fp.clone(),
798 self.session_persist_key,
799 )?);
800
801 self.active_rooms.lock().unwrap().insert(
802 room_id.clone(),
803 ActiveRoom {
804 info: info.clone(),
805 crypto,
806 passphrase_key,
807 members,
808 typers: HashMap::new(),
809 read_only: false,
810 issued_codes: Vec::new(),
811 },
812 );
813
814 self.network.subscribe_room(room_id.clone()).await;
815 self.announce_room_now(&info, 1).await;
816
817 let app = self.clone();
818 let rid = room_id.clone();
819 tokio::spawn(async move {
820 tokio::time::sleep(Duration::from_millis(500)).await;
821 if let Err(e) = app.broadcast_member_announce(&rid).await {
822 warn!(%e, "broadcast member announce for DM");
823 }
824 });
825
826 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
827 room_id: room_id.clone(),
828 });
829 Ok(room_id)
830 }
831
832 fn derive_dm_key_from_pubkey_b64(
837 &self,
838 room_id: &str,
839 pubkey_b64: &str,
840 ) -> Option<[u8; KEY_LEN]> {
841 let bytes = B64.decode(pubkey_b64).ok()?;
842 if bytes.len() != 32 {
843 return None;
844 }
845 let mut pubkey = [0u8; 32];
846 pubkey.copy_from_slice(&bytes);
847 let our_seed = self.identity.secret_bytes();
848 match crate::crypto::dm::derive_dm_key(&our_seed, &pubkey, room_id) {
849 Ok(k) => Some(k),
850 Err(e) => {
851 warn!(%e, "DM key derivation (from announce) failed");
852 None
853 }
854 }
855 }
856
857 fn try_derive_dm_key(
862 &self,
863 room_id: &str,
864 partner_fingerprint: &str,
865 ) -> Option<[u8; KEY_LEN]> {
866 let pubkey_b64 = repo::lookup_peer_ed25519_pubkey(&self.db, partner_fingerprint)
867 .ok()
868 .flatten()?;
869 let bytes = B64.decode(&pubkey_b64).ok()?;
870 if bytes.len() != 32 {
871 return None;
872 }
873 let mut pubkey = [0u8; 32];
874 pubkey.copy_from_slice(&bytes);
875 let our_seed = self.identity.secret_bytes();
876 match crate::crypto::dm::derive_dm_key(&our_seed, &pubkey, room_id) {
877 Ok(k) => Some(k),
878 Err(e) => {
879 warn!(%e, %partner_fingerprint, "DM key derivation failed");
880 None
881 }
882 }
883 }
884
885 async fn bootstrap_direct_room(
891 &self,
892 room_id: &str,
893 partner_fingerprint: &str,
894 ) -> Result<String> {
895 let our_fp = self.identity.fingerprint().to_string();
896 let info = repo::get_room(&self.db, room_id)?
897 .ok_or_else(|| HuddleError::Other(format!("DM room {room_id} not found on disk")))?;
898 let mut members = HashSet::new();
899 members.insert(our_fp.clone());
900 members.insert(partner_fingerprint.to_string());
901
902 if let Ok(stored_members) = repo::list_room_members(&self.db, room_id) {
904 for m in stored_members {
905 members.insert(m.fingerprint);
906 }
907 }
908
909 let (passphrase_key, crypto) = if info.encrypted {
917 let pk = self.try_derive_dm_key(room_id, partner_fingerprint);
918 let c = match RoomCrypto::load(
923 self.db.clone(),
924 room_id.to_string(),
925 our_fp.clone(),
926 self.session_persist_key,
927 )? {
928 Some(c) => Some(c),
929 None => Some(RoomCrypto::new_for_room(
930 self.db.clone(),
931 room_id.to_string(),
932 our_fp.clone(),
933 self.session_persist_key,
934 )?),
935 };
936 (pk, c)
937 } else {
938 (None, None)
939 };
940
941 self.active_rooms.lock().unwrap().insert(
942 room_id.to_string(),
943 ActiveRoom {
944 info: info.clone(),
945 crypto,
946 passphrase_key,
947 members,
948 typers: HashMap::new(),
949 read_only: false,
950 issued_codes: Vec::new(),
951 },
952 );
953
954 self.network.subscribe_room(room_id.to_string()).await;
955 self.announce_room_now(&info, 2).await;
956
957 let app = self.clone();
958 let rid = room_id.to_string();
959 tokio::spawn(async move {
960 tokio::time::sleep(Duration::from_millis(500)).await;
961 if let Err(e) = app.broadcast_member_announce(&rid).await {
962 warn!(%e, "broadcast member announce on DM bootstrap");
963 }
964 });
965
966 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
967 room_id: room_id.to_string(),
968 });
969 Ok(room_id.to_string())
970 }
971
972 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
976 let (name, creator_fingerprint, encrypted, salt_opt) = {
978 if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
979 let salt = self.get_room_salt(room_id);
980 (d.name, d.creator_fingerprint, d.encrypted, salt)
981 } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
982 {
983 (
984 stored.name,
985 stored.creator_fingerprint,
986 stored.encrypted,
987 stored.passphrase_salt,
988 )
989 } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
990 (
991 stored.name,
992 stored.creator_fingerprint,
993 stored.encrypted,
994 stored.passphrase_salt,
995 )
996 } else {
997 return Err(HuddleError::Other(format!("room {room_id} not found")));
998 }
999 };
1000
1001 if encrypted && passphrase.is_none() {
1002 return Err(HuddleError::Other(
1003 "encrypted room requires a passphrase".into(),
1004 ));
1005 }
1006
1007 let passphrase_key = if encrypted {
1008 let salt = salt_opt
1009 .clone()
1010 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
1011 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
1012 } else {
1013 None
1014 };
1015
1016 let kind = self
1021 .discovered_rooms
1022 .lock()
1023 .unwrap()
1024 .get(room_id)
1025 .map(|d| d.kind)
1026 .or_else(|| {
1027 repo::get_room(&self.db, room_id)
1028 .ok()
1029 .flatten()
1030 .map(|r| r.kind)
1031 })
1032 .unwrap_or_default();
1033
1034 let info = StoredRoom {
1035 id: room_id.to_string(),
1036 name,
1037 creator_fingerprint,
1038 encrypted,
1039 passphrase_salt: salt_opt.clone(),
1040 created_at: now_unix(),
1041 last_active: Some(now_unix()),
1042 kind,
1043 };
1044 repo::insert_room(&self.db, &info)?;
1045
1046 let crypto = if encrypted {
1047 let our_fp = self.identity.fingerprint().to_string();
1050 let existing = RoomCrypto::load(
1051 self.db.clone(),
1052 room_id.to_string(),
1053 our_fp.clone(),
1054 self.session_persist_key,
1055 )?;
1056 Some(match existing {
1057 Some(c) => c,
1058 None => RoomCrypto::new_for_room(
1059 self.db.clone(),
1060 room_id.to_string(),
1061 our_fp,
1062 self.session_persist_key,
1063 )?,
1064 })
1065 } else {
1066 None
1067 };
1068
1069 let mut members = HashSet::new();
1070 members.insert(self.identity.fingerprint().to_string());
1071
1072 self.active_rooms.lock().unwrap().insert(
1073 room_id.to_string(),
1074 ActiveRoom {
1075 info: info.clone(),
1076 crypto,
1077 passphrase_key,
1078 members,
1079 typers: HashMap::new(),
1080 read_only: false,
1081 issued_codes: Vec::new(),
1082 },
1083 );
1084 self.restorable_rooms.lock().unwrap().remove(room_id);
1086
1087 self.network.subscribe_room(room_id.to_string()).await;
1088
1089 let app = self.clone();
1090 let rid = room_id.to_string();
1091 tokio::spawn(async move {
1092 tokio::time::sleep(Duration::from_millis(500)).await;
1093 if let Err(e) = app.broadcast_member_announce(&rid).await {
1094 warn!(%e, "broadcast member announce");
1095 }
1096 let req = RoomMessage::SessionKeyRequest {
1098 requester_fingerprint: app.identity.fingerprint().to_string(),
1099 };
1100 if let Ok(bytes) = encode_wire(&req) {
1101 app.network.publish_room_message(rid.clone(), bytes).await;
1102 }
1103 });
1104
1105 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
1106 room_id: room_id.to_string(),
1107 });
1108
1109 Ok(())
1110 }
1111
1112 async fn restore_rooms_from_db(&self) {
1117 let rooms = match repo::list_rooms(&self.db) {
1118 Ok(v) => v,
1119 Err(e) => {
1120 warn!(%e, "list rooms on restore");
1121 return;
1122 }
1123 };
1124 let our_fp = self.identity.fingerprint().to_string();
1125 let count = rooms.len();
1126 for info in rooms {
1127 if info.encrypted {
1128 self.restorable_rooms
1129 .lock()
1130 .unwrap()
1131 .insert(info.id.clone(), info);
1132 continue;
1133 }
1134 let mut members = HashSet::new();
1135 members.insert(our_fp.clone());
1136 if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
1137 for m in stored_members {
1138 members.insert(m.fingerprint);
1139 }
1140 }
1141 let member_count = members.len() as u32;
1142 self.active_rooms.lock().unwrap().insert(
1143 info.id.clone(),
1144 ActiveRoom {
1145 info: info.clone(),
1146 crypto: None,
1147 passphrase_key: None,
1148 members,
1149 typers: HashMap::new(),
1150 read_only: false,
1151 issued_codes: Vec::new(),
1152 },
1153 );
1154 self.network.subscribe_room(info.id.clone()).await;
1155 self.announce_room_now(&info, member_count).await;
1156 info!(room_id = %info.id, name = %info.name, "restored room");
1157 }
1158 if count > 0 {
1159 debug!(count, "restored rooms from db");
1160 }
1161 }
1162
1163 pub async fn leave_room(&self, room_id: &str) -> Result<bool> {
1168 let leave_msg = RoomMessage::MemberLeave {
1172 sender_fingerprint: self.identity.fingerprint().to_string(),
1173 };
1174 let dispatched = match crate::crypto::sign_message(&self.identity, &leave_msg)
1175 .and_then(|env| {
1176 crate::network::protocol::encode_wire_signed(&env)
1177 .map_err(|e| HuddleError::Session(format!("encode signed leave: {e}")))
1178 }) {
1179 Ok(bytes) => {
1180 self.network
1181 .publish_room_message(room_id.to_string(), bytes)
1182 .await;
1183 true
1184 }
1185 Err(e) => {
1186 warn!(%e, %room_id, "failed to sign+encode MemberLeave notice");
1187 false
1188 }
1189 };
1190
1191 self.active_rooms.lock().unwrap().remove(room_id);
1192 self.network.unsubscribe_room(room_id.to_string()).await;
1193
1194 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
1195 room_id: room_id.to_string(),
1196 });
1197 Ok(dispatched)
1198 }
1199
1200 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
1201 let our_fp = self.identity.fingerprint().to_string();
1202 let msg = {
1203 let mut rooms = self.active_rooms.lock().unwrap();
1204 let room = rooms
1205 .get_mut(room_id)
1206 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1207
1208 if room.read_only {
1209 return Err(HuddleError::Other(
1210 "this room is read-only — you joined via code without the passphrase. Ask an owner for the passphrase or wait for a key rotation that includes you.".into(),
1211 ));
1212 }
1213
1214 if room.info.encrypted {
1215 let crypto = room
1216 .crypto
1217 .as_mut()
1218 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
1219 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
1220 RoomMessage::Encrypted {
1221 sender_fingerprint: our_fp.clone(),
1222 session_id,
1223 ciphertext_b64: base64::Engine::encode(
1224 &base64::engine::general_purpose::STANDARD,
1225 &ct_bytes,
1226 ),
1227 }
1228 } else {
1229 RoomMessage::Plain {
1230 sender_fingerprint: our_fp.clone(),
1231 body: body.to_string(),
1232 }
1233 }
1234 };
1235
1236 let bytes = encode_wire(&msg)?;
1237 self.network
1238 .publish_room_message(room_id.to_string(), bytes)
1239 .await;
1240
1241 let now = now_unix();
1242 let msg_id =
1243 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
1244 repo::update_room_last_active(&self.db, room_id, now)?;
1245
1246 let _ = self.app_event_tx.send(AppEvent::MessageSent {
1247 room_id: room_id.to_string(),
1248 body: body.to_string(),
1249 message_id: msg_id,
1250 });
1251
1252 Ok(())
1253 }
1254
1255 pub async fn shutdown(&self) {
1256 self.network.shutdown().await;
1257 }
1258
1259 pub async fn dial_by_id_or_username(&self, input: &str) -> Result<()> {
1286 let trimmed = input.trim();
1287 if trimmed.is_empty() {
1288 return Err(HuddleError::Other("input is empty".into()));
1289 }
1290 let target_fp = if let Some(fp) = normalize_to_fingerprint(trimmed) {
1291 fp
1292 } else {
1293 let matches = repo::find_peers_by_username(&self.db, trimmed)?;
1294 if matches.is_empty() {
1295 return Err(HuddleError::Other(format!(
1296 "no peer named `{}` known yet — paste their invite link instead",
1297 trimmed
1298 )));
1299 }
1300 if matches.len() > 1 {
1301 return Err(HuddleError::Other(format!(
1302 "username `{}` is ambiguous ({} peers share it) — use their HD- ID instead",
1303 trimmed,
1304 matches.len()
1305 )));
1306 }
1307 matches.into_iter().next().unwrap()
1308 };
1309 if target_fp == self.identity.fingerprint() {
1310 return Err(HuddleError::Other("that's your own ID".into()));
1311 }
1312 let candidates = self.resolve_dial_addrs(&target_fp);
1313 if candidates.is_empty() {
1314 return Err(HuddleError::Other(format!(
1315 "haven't seen `{}` on the network yet — ask them for an invite link",
1316 short_fp_for_msg(&target_fp)
1317 )));
1318 }
1319 let now = now_unix();
1324 for addr in &candidates {
1325 let _ = repo::upsert_known_peer(
1326 &self.db,
1327 &KnownPeer {
1328 address: addr.clone(),
1329 label: None,
1330 last_connected_at: None,
1331 last_attempt_at: Some(now),
1332 created_at: now,
1333 fingerprint: Some(target_fp.clone()),
1334 trusted: false,
1335 },
1336 );
1337 }
1338 let multiaddrs: Vec<Multiaddr> = candidates
1342 .iter()
1343 .filter_map(|s| s.parse::<Multiaddr>().ok())
1344 .collect();
1345 if multiaddrs.is_empty() {
1346 return Err(HuddleError::Other(
1347 "every known address for that peer is malformed".into(),
1348 ));
1349 }
1350 let _ = self.app_event_tx.send(AppEvent::Dialing {
1351 address: candidates[0].clone(),
1352 });
1353 info!(
1354 target_fp = %target_fp,
1355 n = multiaddrs.len(),
1356 "dialing peer with {} candidate addresses",
1357 multiaddrs.len()
1358 );
1359 {
1363 let mut pending = self.pending_auto_dm_addrs.lock().unwrap();
1364 for m in &multiaddrs {
1365 pending.insert(m.to_string());
1366 }
1367 }
1368 self.network.dial_addresses(multiaddrs).await;
1369 Ok(())
1370 }
1371
1372 fn resolve_dial_addrs(&self, fingerprint: &str) -> Vec<String> {
1380 let mut set: std::collections::HashSet<String> = std::collections::HashSet::new();
1381 for room in self.discovered_rooms.lock().unwrap().values() {
1382 if room.creator_fingerprint == fingerprint {
1383 for addr in &room.host_addrs {
1384 set.insert(addr.clone());
1385 }
1386 }
1387 }
1388 if let Ok(known) = repo::list_known_peers(&self.db) {
1389 for peer in known {
1390 if peer.fingerprint.as_deref() == Some(fingerprint) {
1391 set.insert(peer.address);
1392 }
1393 }
1394 }
1395 let mut v: Vec<String> = set.into_iter().collect();
1396 v.sort_by_key(|a| address_preference(a));
1397 v
1398 }
1399
1400 pub async fn dial(&self, input: &str) -> Result<()> {
1401 let multiaddr = parse_dial_address(input)?;
1402 let canonical = multiaddr.to_string();
1403 self.pending_auto_dm_addrs
1408 .lock()
1409 .unwrap()
1410 .insert(canonical.clone());
1411 self.dial_internal(canonical, multiaddr).await
1412 }
1413
1414 pub(crate) async fn dial_internal(
1420 &self,
1421 canonical: String,
1422 multiaddr: Multiaddr,
1423 ) -> Result<()> {
1424 info!(%canonical, "dialing");
1425 repo::upsert_known_peer(
1426 &self.db,
1427 &KnownPeer {
1428 address: canonical.clone(),
1429 label: None,
1430 last_connected_at: None,
1431 last_attempt_at: Some(now_unix()),
1432 created_at: now_unix(),
1433 fingerprint: None,
1437 trusted: false,
1438 },
1439 )?;
1440
1441 let _ = self.app_event_tx.send(AppEvent::Dialing {
1442 address: canonical.clone(),
1443 });
1444 self.network.dial(multiaddr).await;
1445 Ok(())
1446 }
1447
1448 pub fn nat_reachable_addrs(&self) -> Vec<String> {
1453 self.nat_reachable_addrs
1454 .lock()
1455 .unwrap()
1456 .iter()
1457 .cloned()
1458 .collect()
1459 }
1460
1461 pub fn dialable_addrs(&self) -> Vec<String> {
1469 let mut out: Vec<String> = self
1470 .relay_circuit_addrs
1471 .lock()
1472 .unwrap()
1473 .iter()
1474 .cloned()
1475 .collect();
1476 for a in self.nat_reachable_addrs.lock().unwrap().iter() {
1477 if !out.contains(a) {
1478 out.push(a.clone());
1479 }
1480 }
1481 out.truncate(4);
1482 out
1483 }
1484
1485 pub async fn dial_invite(&self, address: &str, claimed_fp: &str) -> Result<()> {
1498 let multiaddr = parse_dial_address(address)?;
1499 let canonical = multiaddr.to_string();
1500 self.pending_invite_dials
1501 .lock()
1502 .unwrap()
1503 .insert(canonical.clone(), claimed_fp.to_string());
1504 self.dial(address).await
1507 }
1508
1509 pub fn seed_invite_room(&self, room: &crate::invite::InviteRoom) {
1522 if let Some(salt) = room.salt_b64.as_deref().and_then(|b| B64.decode(b).ok()) {
1523 ROOM_SALT_CACHE
1524 .lock()
1525 .unwrap()
1526 .insert(room.id.clone(), salt);
1527 }
1528 let discovered = DiscoveredRoom {
1529 room_id: room.id.clone(),
1530 name: room.name.clone(),
1531 encrypted: room.encrypted,
1532 member_count: 0,
1533 creator_fingerprint: room.creator_fingerprint.clone(),
1534 last_seen: now_unix(),
1535 restorable: false,
1536 host_addrs: Vec::new(),
1537 kind: RoomKind::Group,
1539 };
1540 self.discovered_rooms
1541 .lock()
1542 .unwrap()
1543 .insert(room.id.clone(), discovered);
1544 }
1545
1546 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
1547 let connected = self.connected_dial_addrs.lock().unwrap().clone();
1548 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
1549 stored
1550 .into_iter()
1551 .map(|p| {
1552 let connected_peer = connected.get(&p.address).copied();
1553 KnownPeerStatus {
1554 address: p.address,
1555 label: p.label,
1556 last_connected_at: p.last_connected_at,
1557 connected_peer_id: connected_peer,
1558 fingerprint: p.fingerprint,
1559 }
1560 })
1561 .collect()
1562 }
1563
1564 pub async fn forget_peer(&self, address: &str) -> Result<()> {
1565 repo::forget_known_peer(&self.db, address)?;
1566 self.connected_dial_addrs.lock().unwrap().remove(address);
1567 Ok(())
1568 }
1569
1570 pub async fn redial(&self, address: &str) -> Result<()> {
1572 self.dial(address).await
1573 }
1574
1575 pub async fn accept_inbound(&self, peer_id: PeerId, address: &str) {
1580 self.network.accept_inbound(peer_id).await;
1581 self.connected_dial_addrs
1582 .lock()
1583 .unwrap()
1584 .insert(address.to_string(), peer_id);
1585 }
1586
1587 pub async fn reject_inbound(&self, peer_id: PeerId, fingerprint: &str) -> Result<()> {
1592 self.network.reject_inbound(peer_id).await;
1593 repo::block_peer(&self.db, fingerprint, now_unix())?;
1594 Ok(())
1595 }
1596
1597 pub async fn trust_inbound(
1600 &self,
1601 peer_id: PeerId,
1602 fingerprint: &str,
1603 address: &str,
1604 ) -> Result<()> {
1605 self.network.accept_inbound(peer_id).await;
1606 self.connected_dial_addrs
1607 .lock()
1608 .unwrap()
1609 .insert(address.to_string(), peer_id);
1610 repo::upsert_known_peer(
1614 &self.db,
1615 &KnownPeer {
1616 address: address.to_string(),
1617 label: None,
1618 last_connected_at: Some(now_unix()),
1619 last_attempt_at: Some(now_unix()),
1620 created_at: now_unix(),
1621 fingerprint: Some(fingerprint.to_string()),
1622 trusted: true,
1623 },
1624 )?;
1625 Ok(())
1626 }
1627
1628 pub fn list_pending_friend_requests(&self) -> Vec<repo::PendingFriendRequest> {
1636 repo::list_pending_friend_requests(&self.db).unwrap_or_default()
1637 }
1638
1639 pub fn spill_pending_friend_request(
1645 &self,
1646 peer_id: PeerId,
1647 fingerprint: &str,
1648 address: &str,
1649 ) -> Result<()> {
1650 repo::upsert_pending_friend_request(
1651 &self.db,
1652 &repo::PendingFriendRequest {
1653 fingerprint: fingerprint.to_string(),
1654 address: address.to_string(),
1655 peer_id: peer_id.to_string(),
1656 received_at: now_unix(),
1657 },
1658 )?;
1659 Ok(())
1660 }
1661
1662 pub async fn accept_pending_friend_request(&self, fingerprint: &str) -> Result<()> {
1669 let mut chosen_addr: Option<String> = None;
1670 for req in self.list_pending_friend_requests() {
1671 if req.fingerprint == fingerprint {
1672 chosen_addr = Some(req.address);
1673 break;
1674 }
1675 }
1676 repo::delete_pending_friend_requests_for_fp(&self.db, fingerprint)?;
1677 if let Some(addr) = chosen_addr {
1678 repo::upsert_known_peer(
1682 &self.db,
1683 &KnownPeer {
1684 address: addr.clone(),
1685 label: None,
1686 last_connected_at: None,
1687 last_attempt_at: Some(now_unix()),
1688 created_at: now_unix(),
1689 fingerprint: Some(fingerprint.to_string()),
1690 trusted: true,
1691 },
1692 )?;
1693 self.dial(&addr).await?;
1695 }
1696 Ok(())
1697 }
1698
1699 pub fn reject_pending_friend_request(&self, fingerprint: &str) -> Result<()> {
1704 repo::delete_pending_friend_requests_for_fp(&self.db, fingerprint)?;
1705 repo::block_peer(&self.db, fingerprint, now_unix())?;
1706 Ok(())
1707 }
1708
1709 pub async fn disconnect_peer(&self, peer_id: PeerId) {
1716 self.network.disconnect_peer(peer_id).await;
1717 }
1718
1719 fn spawn_known_peer_reconnector(&self) {
1720 let handle = self.clone();
1721 tokio::spawn(async move {
1722 tokio::time::sleep(Duration::from_millis(500)).await;
1724 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
1725 for (i, peer) in known.into_iter().enumerate() {
1729 let handle = handle.clone();
1730 tokio::spawn(async move {
1731 let jitter = (peer.address.len() as u64 * 37) % 200;
1734 tokio::time::sleep(Duration::from_millis(150 * i as u64 + jitter)).await;
1735 let multiaddr = match peer.address.parse::<Multiaddr>() {
1740 Ok(m) => m,
1741 Err(_) => return,
1742 };
1743 if let Err(e) = handle.dial_internal(peer.address.clone(), multiaddr).await {
1744 debug!(%e, addr = %peer.address, "auto-reconnect failed");
1745 }
1746 });
1747 }
1748 });
1749 }
1750
1751 fn load_or_create_identity(db: &Db) -> Result<Identity> {
1756 if let Some(stored) = repo::load_identity(db)? {
1757 let mut bytes = [0u8; 32];
1758 bytes.copy_from_slice(&stored.ed25519_secret);
1759 Identity::from_secret_bytes(bytes)
1760 } else {
1761 let id = Identity::generate()?;
1762 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
1763 Ok(id)
1764 }
1765 }
1766
1767 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
1768 self.active_rooms
1769 .lock()
1770 .unwrap()
1771 .get(room_id)
1772 .and_then(|r| r.info.passphrase_salt.clone())
1773 .or_else(|| {
1774 ROOM_SALT_CACHE
1776 .lock()
1777 .unwrap()
1778 .get(room_id)
1779 .cloned()
1780 })
1781 }
1782
1783 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
1784 let owner_fingerprints =
1785 repo::list_room_owners(&self.db, &info.id).unwrap_or_default();
1786 let verified_only = repo::get_room_verified_only(&self.db, &info.id).unwrap_or(false);
1787 let host_addrs = self.dialable_addrs();
1788 let ann = RoomAnnouncement {
1789 room_id: info.id.clone(),
1790 name: info.name.clone(),
1791 encrypted: info.encrypted,
1792 passphrase_salt: info.passphrase_salt.clone(),
1793 member_count,
1794 creator_fingerprint: info.creator_fingerprint.clone(),
1795 announced_at: now_unix(),
1796 owner_fingerprints,
1797 verified_only,
1798 host_addrs,
1799 kind: info.kind,
1800 };
1801 self.network.announce_room(ann).await;
1802 }
1803
1804 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
1805 let our_fp = self.identity.fingerprint().to_string();
1806 let wrapped = {
1807 let mut rooms = self.active_rooms.lock().unwrap();
1808 let room = rooms
1809 .get_mut(room_id)
1810 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
1811 if room.info.encrypted {
1812 let crypto = room.crypto.as_mut().unwrap();
1813 let session_key = crypto.our_session_key_b64();
1814 match room.passphrase_key.as_ref() {
1815 Some(passphrase_key) => {
1816 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
1817 }
1818 None if room.info.kind == RoomKind::Direct => {
1819 None
1829 }
1830 None => {
1831 return Err(HuddleError::Session("missing passphrase key".into()));
1832 }
1833 }
1834 } else {
1835 None
1836 }
1837 };
1838 let display_name = repo::get_display_name(&self.db).unwrap_or(None);
1839 let msg = RoomMessage::MemberAnnounce {
1840 sender_fingerprint: our_fp,
1841 wrapped_session_key: wrapped,
1842 display_name,
1843 sender_ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
1844 };
1845 let env = crate::crypto::sign_message(&self.identity, &msg)?;
1850 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
1851 self.network
1852 .publish_room_message(room_id.to_string(), bytes)
1853 .await;
1854 Ok(())
1855 }
1856
1857 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
1858 let handle = self.clone();
1859 tokio::spawn(async move {
1860 while let Some(event) = net_rx.recv().await {
1861 handle.process_network_event(event).await;
1862 }
1863 info!("event processor stopped");
1864 });
1865 }
1866
1867 fn spawn_server_connection(&self, url: String, tor_socks: Option<String>) {
1875 let socks = if url.contains(".onion") {
1881 Some(tor_socks.unwrap_or_else(|| DEFAULT_TOR_SOCKS.to_string()))
1882 } else {
1883 None
1884 };
1885 let handle = self.clone();
1886 tokio::spawn(async move {
1887 let mut backoff = 1u64;
1888 loop {
1889 let fp = handle.identity.fingerprint().to_string();
1890 let rooms: Vec<String> =
1891 handle.active_rooms.lock().unwrap().keys().cloned().collect();
1892 match ServerClient::connect(&url, socks.as_deref(), fp, rooms).await {
1893 Ok((client, mut rx)) => {
1894 backoff = 1;
1895 handle.network.attach_server(client);
1896 info!(%url, "connected to huddle-server");
1897 while let Some(ev) = rx.recv().await {
1898 match ev {
1899 ServerEvent::Message { room, payload, .. } => {
1900 handle
1903 .process_network_event(
1904 NetworkEvent::RoomMessageReceived {
1905 room_id: room,
1906 payload,
1907 from_peer: PeerId::random(),
1908 },
1909 )
1910 .await;
1911 }
1912 ServerEvent::Ready | ServerEvent::Sent { .. } => {}
1915 ServerEvent::Disconnected => break,
1916 }
1917 }
1918 handle.network.detach_server();
1919 warn!("huddle-server connection closed; reconnecting");
1920 }
1921 Err(e) => warn!(error = %e, "huddle-server connect failed; will retry"),
1922 }
1923 tokio::time::sleep(Duration::from_secs(backoff)).await;
1924 backoff = (backoff * 2).min(30);
1925 }
1926 });
1927 }
1928
1929 fn spawn_announcement_ticker(&self) {
1930 let handle = self.clone();
1931 tokio::spawn(async move {
1932 let mut interval =
1933 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
1934 interval.tick().await; loop {
1936 interval.tick().await;
1937 let snapshot: Vec<(StoredRoom, u32)> = {
1938 let active = handle.active_rooms.lock().unwrap();
1939 active
1940 .values()
1941 .map(|r| (r.info.clone(), r.members.len() as u32))
1942 .collect()
1943 };
1944 for (info, member_count) in snapshot {
1945 handle.announce_room_now(&info, member_count).await;
1946 }
1947 }
1948 });
1949 }
1950
1951 fn spawn_discovered_room_pruner(&self) {
1952 let handle = self.clone();
1953 tokio::spawn(async move {
1954 let mut interval = tokio::time::interval(Duration::from_secs(10));
1955 interval.tick().await;
1956 loop {
1957 interval.tick().await;
1958 let now = now_unix();
1959 let mut to_drop = Vec::new();
1960 {
1961 let mut map = handle.discovered_rooms.lock().unwrap();
1962 map.retain(|id, r| {
1963 if now - r.last_seen > DISCOVERED_TTL_SECS {
1964 to_drop.push(id.clone());
1965 false
1966 } else {
1967 true
1968 }
1969 });
1970 }
1971 for id in to_drop {
1972 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
1973 }
1974 }
1975 });
1976 }
1977
1978 async fn process_network_event(&self, event: NetworkEvent) {
1979 match event {
1980 NetworkEvent::PeerDiscovered { peer_id } => {
1981 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
1982 }
1983 NetworkEvent::PeerExpired { peer_id } => {
1984 self.connected_dial_addrs
1990 .lock()
1991 .unwrap()
1992 .retain(|_addr, pid| *pid != peer_id);
1993 let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
1994 }
1995 NetworkEvent::PeerDisconnected { peer_id } => {
1996 self.connected_dial_addrs
2002 .lock()
2003 .unwrap()
2004 .retain(|_addr, pid| *pid != peer_id);
2005 let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
2006 }
2007 NetworkEvent::ListeningOn { address } => {
2014 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
2015 address: address.to_string(),
2016 });
2017 }
2018 NetworkEvent::RoomAnnouncementReceived(ann) => {
2019 if let Some(salt) = &ann.passphrase_salt {
2021 ROOM_SALT_CACHE
2022 .lock()
2023 .unwrap()
2024 .insert(ann.room_id.clone(), salt.clone());
2025 }
2026 let our_fp_for_dial = self.identity.fingerprint().to_string();
2031 if ann.creator_fingerprint != our_fp_for_dial && !ann.host_addrs.is_empty() {
2032 let now = now_unix();
2033 let should_dial = {
2034 let mut attempts = self.host_addr_dial_attempts.lock().unwrap();
2035 match attempts.get(&ann.creator_fingerprint).copied() {
2036 Some(last) if now - last < HOST_ADDR_DIAL_BACKOFF_SECS => false,
2037 _ => {
2038 attempts.insert(ann.creator_fingerprint.clone(), now);
2039 true
2040 }
2041 }
2042 };
2043 if should_dial {
2044 if let Some(first) = ann.host_addrs.first() {
2045 info!(
2046 announcer = %ann.creator_fingerprint,
2047 addr = %first,
2048 "opportunistic dial via room announcement host_addrs"
2049 );
2050 if let Ok(multiaddr) = first.parse::<Multiaddr>() {
2055 let canonical = multiaddr.to_string();
2056 let _ = self.dial_internal(canonical, multiaddr).await;
2057 }
2058 }
2059 }
2060 }
2061 let discovered = DiscoveredRoom {
2062 room_id: ann.room_id.clone(),
2063 name: ann.name.clone(),
2064 encrypted: ann.encrypted,
2065 member_count: ann.member_count,
2066 creator_fingerprint: ann.creator_fingerprint.clone(),
2067 last_seen: now_unix(),
2068 restorable: false,
2069 host_addrs: ann.host_addrs.clone(),
2070 kind: ann.kind,
2071 };
2072 if self.active_rooms.lock().unwrap().contains_key(&ann.room_id) {
2077 self.discovered_rooms
2078 .lock()
2079 .unwrap()
2080 .insert(ann.room_id.clone(), discovered);
2081 return;
2082 }
2083 if ann.kind == RoomKind::Direct {
2093 let our_fp_for_filter = self.identity.fingerprint().to_string();
2094 if canonical_dm_room_id(&our_fp_for_filter, &ann.creator_fingerprint)
2095 != ann.room_id
2096 {
2097 debug!(
2098 announcer = %ann.creator_fingerprint,
2099 room_id = %ann.room_id,
2100 "dropping Direct announcement: not addressed to us"
2101 );
2102 return;
2103 }
2104 if repo::is_peer_blocked(&self.db, &ann.creator_fingerprint).unwrap_or(false)
2116 {
2117 debug!(
2118 partner = %ann.creator_fingerprint,
2119 "ignoring Direct announcement from blocked peer"
2120 );
2121 return;
2122 }
2123 self.discovered_rooms
2124 .lock()
2125 .unwrap()
2126 .insert(ann.room_id.clone(), discovered.clone());
2127 let _ = self
2128 .app_event_tx
2129 .send(AppEvent::RoomDiscovered(discovered.clone()));
2130 let app = self.clone();
2131 let partner = ann.creator_fingerprint.clone();
2132 let rid = ann.room_id.clone();
2133 tokio::spawn(async move {
2134 if let Err(e) = app.start_direct(&partner).await {
2135 debug!(%e, room_id = %rid, "auto-bootstrap of inbound DM failed");
2136 }
2137 });
2138 return;
2139 }
2140 self.discovered_rooms
2141 .lock()
2142 .unwrap()
2143 .insert(ann.room_id.clone(), discovered.clone());
2144 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
2145 }
2146 NetworkEvent::RoomMessageReceived {
2147 room_id,
2148 payload,
2149 from_peer: _,
2150 } => {
2151 let wire: WireMessage = match serde_json::from_slice(&payload) {
2158 Ok(w) => w,
2159 Err(e) => {
2160 warn!(%e, "bad wire envelope");
2161 return;
2162 }
2163 };
2164 let (msg, verified_signer) = match wire {
2165 WireMessage::Plain(m) => (m, None),
2166 WireMessage::Signed(env) => {
2167 let claimed_pubkey = env.ed25519_pubkey_b64.clone();
2168 match crate::crypto::verify_signed(&env) {
2169 Ok((m, fp)) => {
2170 match repo::get_member_ed25519_pubkey(
2177 &self.db, &room_id, &fp,
2178 ) {
2179 Ok(Some(known)) if known != claimed_pubkey => {
2180 warn!(
2181 %fp, %room_id,
2182 "pubkey mismatch vs stored; dropping signed message"
2183 );
2184 return;
2185 }
2186 _ => {}
2187 }
2188 (m, Some(fp))
2189 }
2190 Err(e) => {
2191 warn!(%e, fp = %env.fingerprint, "signed envelope verify failed");
2192 return;
2193 }
2194 }
2195 }
2196 };
2197 self.handle_room_message(&room_id, msg, verified_signer).await;
2198 }
2199 NetworkEvent::DialSucceeded { peer_id, address } => {
2200 let addr_s = address.to_string();
2201 self.connected_dial_addrs
2202 .lock()
2203 .unwrap()
2204 .insert(addr_s.clone(), peer_id);
2205 let _ = repo::upsert_known_peer(
2209 &self.db,
2210 &KnownPeer {
2211 address: addr_s.clone(),
2212 label: None,
2213 last_connected_at: Some(now_unix()),
2214 last_attempt_at: Some(now_unix()),
2215 created_at: now_unix(),
2216 fingerprint: None,
2217 trusted: false,
2218 },
2219 );
2220 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
2221 address: addr_s,
2222 peer_id,
2223 });
2224 }
2225 NetworkEvent::DialFailed { address, error } => {
2226 let addr_s = address.to_string();
2227 let _ = self.app_event_tx.send(AppEvent::DialFailed {
2228 address: addr_s,
2229 error,
2230 });
2231 }
2232 NetworkEvent::PeerIdentified { peer_id, fingerprint } => {
2233 let matched_addrs: Vec<String> = {
2239 let map = self.connected_dial_addrs.lock().unwrap();
2240 map.iter()
2241 .filter_map(|(addr, pid)| {
2242 if *pid == peer_id {
2243 Some(addr.clone())
2244 } else {
2245 None
2246 }
2247 })
2248 .collect()
2249 };
2250 let mismatch = {
2260 let mut map = self.pending_invite_dials.lock().unwrap();
2261 let mut found: Option<(String, String)> = None;
2262 for addr in &matched_addrs {
2263 if let Some(claimed) = map.remove(addr) {
2264 if claimed != fingerprint {
2265 found = Some((addr.clone(), claimed));
2266 break;
2267 }
2268 }
2269 }
2270 found
2271 };
2272 if let Some((addr, claimed)) = mismatch {
2273 warn!(
2274 %addr, %claimed, actual=%fingerprint,
2275 "invite fingerprint mismatch — disconnecting"
2276 );
2277 self.network.disconnect_peer(peer_id).await;
2278 let _ = self.app_event_tx.send(AppEvent::InviteFingerprintMismatch {
2279 address: addr,
2280 claimed,
2281 actual: fingerprint.clone(),
2282 });
2283 return;
2284 }
2285 let should_auto_dm = {
2292 let mut pending = self.pending_auto_dm_addrs.lock().unwrap();
2293 let mut any_matched = false;
2294 for addr in &matched_addrs {
2295 if pending.remove(addr) {
2296 any_matched = true;
2297 }
2298 }
2299 any_matched
2300 };
2301 for addr in matched_addrs {
2302 let _ = repo::upsert_known_peer(
2303 &self.db,
2304 &KnownPeer {
2305 address: addr,
2306 label: None,
2307 last_connected_at: Some(now_unix()),
2308 last_attempt_at: Some(now_unix()),
2309 created_at: now_unix(),
2310 fingerprint: Some(fingerprint.clone()),
2311 trusted: true,
2312 },
2313 );
2314 }
2315 let blocked = repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false);
2328 if should_auto_dm && !blocked && fingerprint != self.identity.fingerprint() {
2329 match self.start_direct(&fingerprint).await {
2330 Ok(room_id) => {
2331 let _ = self.app_event_tx.send(AppEvent::AutoOpenDm {
2332 room_id,
2333 fingerprint: fingerprint.clone(),
2334 });
2335 }
2336 Err(e) => {
2337 debug!(%e, fp = %fingerprint, "auto-DM after dial failed");
2338 }
2339 }
2340 }
2341 let our_username = repo::get_display_name(&self.db).unwrap_or(None);
2349 if our_username.is_some() {
2350 let now_ms = now_unix_ms();
2351 let should_send = {
2352 let mut last = self.last_profile_broadcast_at_ms.lock().unwrap();
2353 match last.get(&fingerprint) {
2354 Some(prev) if now_ms - prev < PROFILE_REBROADCAST_FLOOR_MS => false,
2355 _ => {
2356 last.insert(fingerprint.clone(), now_ms);
2357 true
2358 }
2359 }
2360 };
2361 if should_send {
2362 let msg = RoomMessage::ProfileUpdate {
2363 sender_fingerprint: self.identity.fingerprint().to_string(),
2364 username: our_username,
2365 updated_at: now_ms,
2366 };
2367 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
2368 if let Ok(bytes) =
2369 crate::network::protocol::encode_wire_signed(&env)
2370 {
2371 let rooms: Vec<String> = self
2372 .active_rooms
2373 .lock()
2374 .unwrap()
2375 .keys()
2376 .cloned()
2377 .collect();
2378 for room_id in rooms {
2379 self.network
2380 .publish_room_message(room_id, bytes.clone())
2381 .await;
2382 }
2383 }
2384 }
2385 }
2386 }
2387 }
2388 NetworkEvent::RelayReservationEstablished { address } => {
2389 info!(addr = %address, "relay reservation established");
2394 self.relay_circuit_addrs
2395 .lock()
2396 .unwrap()
2397 .insert(address.to_string());
2398 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
2399 address: address.to_string(),
2400 });
2401 }
2402 NetworkEvent::NatProbeResult {
2403 tested_addr,
2404 reachable,
2405 } => {
2406 let addr_s = tested_addr.to_string();
2407 let (transitioned, becomes_reachable) = {
2408 let mut set = self.nat_reachable_addrs.lock().unwrap();
2409 let was_empty = set.is_empty();
2410 if reachable {
2411 set.insert(addr_s.clone());
2412 } else {
2413 set.remove(&addr_s);
2414 }
2415 let is_empty = set.is_empty();
2416 (was_empty != is_empty, !is_empty)
2417 };
2418 if transitioned {
2419 let label = if becomes_reachable {
2420 "reachable".to_string()
2421 } else {
2422 "private".to_string()
2423 };
2424 info!(reachable = %becomes_reachable, "NAT reachability changed");
2425 let _ = self.app_event_tx.send(AppEvent::NatStatusChanged {
2426 label,
2427 reachable: becomes_reachable,
2428 });
2429 }
2430 }
2431 NetworkEvent::DcutrUpgrade {
2432 remote_peer,
2433 success,
2434 } => {
2435 if success {
2436 let s = remote_peer.to_base58();
2440 let tail: String = s.chars().rev().take(8).collect::<String>()
2441 .chars()
2442 .rev()
2443 .collect();
2444 let _ = self.app_event_tx.send(AppEvent::DcutrSucceeded {
2445 peer_label: tail,
2446 });
2447 }
2448 }
2449 NetworkEvent::InboundDial {
2450 peer_id,
2451 fingerprint,
2452 address,
2453 } => {
2454 if repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false) {
2456 info!(%fingerprint, "inbound dial auto-rejected: peer is blocked");
2457 self.network.reject_inbound(peer_id).await;
2458 return;
2459 }
2460 let global_verified_only =
2465 repo::get_setting(&self.db, "verified_only_inbound")
2466 .ok()
2467 .flatten()
2468 .map(|v| v == "1")
2469 .unwrap_or(false);
2470 if global_verified_only {
2471 let is_verified =
2472 repo::is_globally_verified(&self.db, &fingerprint).unwrap_or(false)
2473 || repo::is_fingerprint_trusted(&self.db, &fingerprint)
2474 .unwrap_or(false);
2475 if !is_verified {
2476 info!(
2477 %fingerprint,
2478 "inbound dial auto-rejected: verified-only mode"
2479 );
2480 self.network.reject_inbound(peer_id).await;
2481 return;
2482 }
2483 }
2484 if repo::is_fingerprint_trusted(&self.db, &fingerprint).unwrap_or(false) {
2485 info!(%fingerprint, "inbound dial auto-accepted: peer is trusted");
2486 self.connected_dial_addrs
2489 .lock()
2490 .unwrap()
2491 .insert(address.to_string(), peer_id);
2492 let _ = repo::upsert_known_peer(
2493 &self.db,
2494 &KnownPeer {
2495 address: address.to_string(),
2496 label: None,
2497 last_connected_at: Some(now_unix()),
2498 last_attempt_at: Some(now_unix()),
2499 created_at: now_unix(),
2500 fingerprint: Some(fingerprint),
2501 trusted: true,
2502 },
2503 );
2504 self.network.accept_inbound(peer_id).await;
2505 return;
2506 }
2507 let _ = self.app_event_tx.send(AppEvent::InboundDial {
2509 peer_id,
2510 fingerprint,
2511 address: address.to_string(),
2512 });
2513 }
2514 }
2515 }
2516
2517 async fn handle_room_message(
2523 &self,
2524 room_id: &str,
2525 msg: RoomMessage,
2526 verified_signer: Option<String>,
2527 ) {
2528 let our_fp = self.identity.fingerprint().to_string();
2529 match msg {
2530 RoomMessage::MemberAnnounce {
2531 sender_fingerprint,
2532 wrapped_session_key,
2533 display_name,
2534 sender_ed25519_pubkey,
2535 } => {
2536 if sender_fingerprint == our_fp {
2537 return;
2538 }
2539 let signer = match verified_signer {
2549 Some(fp) => fp,
2550 None => {
2551 warn!(%sender_fingerprint, %room_id, "MemberAnnounce arrived unsigned; dropping");
2552 return;
2553 }
2554 };
2555 if signer != sender_fingerprint {
2556 warn!(%signer, %sender_fingerprint, %room_id, "MemberAnnounce signer mismatch; dropping");
2557 return;
2558 }
2559 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2562 .unwrap_or(false)
2563 {
2564 info!(%sender_fingerprint, %room_id, "dropping MemberAnnounce from banned peer");
2565 return;
2566 }
2567 if repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
2574 && !repo::is_globally_verified(&self.db, &sender_fingerprint).unwrap_or(false)
2575 {
2576 info!(
2577 %sender_fingerprint, %room_id,
2578 "dropping MemberAnnounce: room is verified-only and joiner isn't verified"
2579 );
2580 let owners = repo::list_room_owners(&self.db, room_id).unwrap_or_default();
2581 let lowest_owner = owners.iter().min().cloned();
2582 if lowest_owner.as_deref() == Some(&our_fp) {
2583 let msg = RoomMessage::JoinRefused {
2584 room_id: room_id.to_string(),
2585 target_fingerprint: sender_fingerprint.clone(),
2586 reason: "room requires SAS verification — ask an existing member to verify you".into(),
2587 };
2588 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
2589 if let Ok(bytes) =
2590 crate::network::protocol::encode_wire_signed(&env)
2591 {
2592 self.network
2593 .publish_room_message(room_id.to_string(), bytes)
2594 .await;
2595 }
2596 }
2597 }
2598 return;
2599 }
2600 let need_inbound = {
2601 let mut rooms = self.active_rooms.lock().unwrap();
2602 let room = match rooms.get_mut(room_id) {
2603 Some(r) => r,
2604 None => return,
2605 };
2606 if room.info.kind == RoomKind::Direct
2614 && !room.members.contains(&sender_fingerprint)
2615 && room.members.len() >= 2
2616 {
2617 info!(
2618 %sender_fingerprint, %room_id,
2619 "dropping MemberAnnounce on Direct room: already at 2-member cap"
2620 );
2621 return;
2622 }
2623 let newly_added = room.members.insert(sender_fingerprint.clone());
2624 if newly_added {
2625 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
2626 room_id: room_id.to_string(),
2627 fingerprint: sender_fingerprint.clone(),
2628 });
2629 }
2630 let _ = repo::upsert_room_member(
2635 &self.db,
2636 &StoredRoomMember {
2637 room_id: room_id.to_string(),
2638 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
2640 last_seen: Some(now_unix()),
2641 verified: false,
2642 ed25519_pubkey: sender_ed25519_pubkey.clone(),
2643 role: "member".into(),
2649 },
2650 );
2651 if let Some(name) = display_name.as_deref() {
2652 let _ = repo::set_member_display_name(
2653 &self.db,
2654 room_id,
2655 &sender_fingerprint,
2656 Some(name),
2657 );
2658 }
2659 room.info.encrypted && wrapped_session_key.is_some()
2660 };
2661
2662 if matches!(
2669 self.active_rooms
2670 .lock()
2671 .unwrap()
2672 .get(room_id)
2673 .map(|r| (r.info.kind, r.passphrase_key.is_none())),
2674 Some((RoomKind::Direct, true))
2675 ) {
2676 if let Some(pubkey_b64) = sender_ed25519_pubkey.as_deref() {
2677 if let Some(key) =
2678 self.derive_dm_key_from_pubkey_b64(room_id, pubkey_b64)
2679 {
2680 let mut rooms = self.active_rooms.lock().unwrap();
2681 if let Some(room) = rooms.get_mut(room_id) {
2682 room.passphrase_key = Some(key);
2683 }
2684 drop(rooms);
2685 let app = self.clone();
2690 let rid = room_id.to_string();
2691 tokio::spawn(async move {
2692 if let Err(e) = app.broadcast_member_announce(&rid).await {
2693 warn!(%e, "re-broadcast DM announce after key derivation");
2694 }
2695 });
2696 }
2697 }
2698 }
2699
2700 if need_inbound {
2701 let wrapped = wrapped_session_key.unwrap();
2702 let result = {
2703 let mut rooms = self.active_rooms.lock().unwrap();
2704 let room = rooms.get_mut(room_id).unwrap();
2705 let passphrase_key = match &room.passphrase_key {
2706 Some(k) => k,
2707 None => {
2708 warn!("no passphrase key when receiving session key");
2709 return;
2710 }
2711 };
2712 match passphrase::unwrap(&wrapped, passphrase_key) {
2713 Ok(plain) => match String::from_utf8(plain) {
2714 Ok(key_b64) => {
2715 let crypto = room.crypto.as_mut().unwrap();
2716 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
2717 }
2718 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
2719 },
2720 Err(e) => Err(e),
2721 }
2722 };
2723 if let Err(e) = result {
2724 error!(%e, "add inbound session failed");
2725 }
2726 }
2727 }
2728 RoomMessage::SessionKeyRequest {
2729 requester_fingerprint,
2730 } => {
2731 if requester_fingerprint == our_fp {
2732 return;
2733 }
2734 if let Err(e) = self.broadcast_member_announce(room_id).await {
2736 warn!(%e, "broadcast member announce on request");
2737 }
2738 }
2739 RoomMessage::Encrypted {
2740 sender_fingerprint,
2741 session_id,
2742 ciphertext_b64,
2743 } => {
2744 if sender_fingerprint == our_fp {
2745 return;
2746 }
2747 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2755 .unwrap_or(false)
2756 {
2757 debug!(%sender_fingerprint, %room_id, "dropping Encrypted from banned peer");
2758 return;
2759 }
2760 let ct_bytes = match base64::Engine::decode(
2761 &base64::engine::general_purpose::STANDARD,
2762 &ciphertext_b64,
2763 ) {
2764 Ok(b) => b,
2765 Err(e) => {
2766 warn!(%e, "bad base64 ciphertext");
2767 return;
2768 }
2769 };
2770 let plaintext = {
2771 let mut rooms = self.active_rooms.lock().unwrap();
2772 let room = match rooms.get_mut(room_id) {
2773 Some(r) => r,
2774 None => return,
2775 };
2776 let crypto = match room.crypto.as_mut() {
2777 Some(c) => c,
2778 None => return,
2779 };
2780 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
2781 };
2782 match plaintext {
2783 Ok(pt) => {
2784 let body = String::from_utf8_lossy(&pt).to_string();
2785 let sent_at = now_unix();
2786 let _ = repo::insert_room_message(
2787 &self.db,
2788 room_id,
2789 &sender_fingerprint,
2790 "in",
2791 &body,
2792 sent_at,
2793 );
2794 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
2795 self.maybe_emit_mention(room_id, &body);
2796 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
2797 room_id: room_id.to_string(),
2798 sender_fingerprint,
2799 body,
2800 sent_at,
2801 });
2802 }
2803 Err(e) => {
2804 debug!(%e, "decrypt failed (probably missing session key)");
2805 }
2806 }
2807 }
2808 RoomMessage::Plain {
2809 sender_fingerprint,
2810 body,
2811 } => {
2812 if sender_fingerprint == our_fp {
2813 return;
2814 }
2815 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2816 .unwrap_or(false)
2817 {
2818 debug!(%sender_fingerprint, %room_id, "dropping Plain from banned peer");
2819 return;
2820 }
2821 let sent_at = now_unix();
2822 let _ = repo::insert_room_message(
2823 &self.db,
2824 room_id,
2825 &sender_fingerprint,
2826 "in",
2827 &body,
2828 sent_at,
2829 );
2830 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
2831 self.maybe_emit_mention(room_id, &body);
2832 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
2833 room_id: room_id.to_string(),
2834 sender_fingerprint,
2835 body,
2836 sent_at,
2837 });
2838 }
2839 RoomMessage::Typing { sender_fingerprint } => {
2840 if sender_fingerprint == our_fp {
2841 return;
2842 }
2843 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2844 .unwrap_or(false)
2845 {
2846 return;
2847 }
2848 let expiry = now_unix() + TYPING_TTL_SECS;
2849 let mut rooms = self.active_rooms.lock().unwrap();
2850 if let Some(room) = rooms.get_mut(room_id) {
2851 room.typers.insert(sender_fingerprint, expiry);
2852 }
2853 drop(rooms);
2854 let _ = self.app_event_tx.send(AppEvent::TypingChanged {
2855 room_id: room_id.to_string(),
2856 });
2857 }
2858 RoomMessage::RotateRoomKey {
2859 rotator_fingerprint,
2860 new_salt,
2861 } => {
2862 if rotator_fingerprint == our_fp {
2863 return;
2864 }
2865 let signer = match verified_signer {
2870 Some(fp) => fp,
2871 None => {
2872 warn!(%room_id, "RotateRoomKey arrived unsigned; dropping");
2873 return;
2874 }
2875 };
2876 if signer != rotator_fingerprint {
2877 warn!(
2878 %signer, %rotator_fingerprint, %room_id,
2879 "RotateRoomKey signer mismatch with claimed rotator; dropping"
2880 );
2881 return;
2882 }
2883 let _ = self.app_event_tx.send(AppEvent::RotationRequested {
2884 room_id: room_id.to_string(),
2885 rotator_fingerprint,
2886 new_salt,
2887 });
2888 }
2889 RoomMessage::MemberLeave { sender_fingerprint } => {
2890 if sender_fingerprint == our_fp {
2891 return;
2892 }
2893 let signer = match verified_signer {
2897 Some(fp) => fp,
2898 None => {
2899 warn!(%sender_fingerprint, %room_id, "MemberLeave arrived unsigned; dropping");
2900 return;
2901 }
2902 };
2903 if signer != sender_fingerprint {
2904 warn!(%signer, %sender_fingerprint, %room_id, "MemberLeave signer mismatch; dropping");
2905 return;
2906 }
2907 let removed = {
2908 let mut rooms = self.active_rooms.lock().unwrap();
2909 if let Some(room) = rooms.get_mut(room_id) {
2910 room.members.remove(&sender_fingerprint)
2911 } else {
2912 false
2913 }
2914 };
2915 if removed {
2916 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
2917 room_id: room_id.to_string(),
2918 fingerprint: sender_fingerprint,
2919 });
2920 }
2921 }
2922 RoomMessage::FileOffer {
2923 sender_fingerprint,
2924 file_id,
2925 name,
2926 size_bytes,
2927 mime,
2928 chunk_count,
2929 encrypted_meta,
2930 } => {
2931 if sender_fingerprint == our_fp {
2932 return; }
2934 let signer = match verified_signer {
2939 Some(fp) => fp,
2940 None => {
2941 warn!(%sender_fingerprint, %room_id, %file_id, "FileOffer arrived unsigned; dropping");
2942 return;
2943 }
2944 };
2945 if signer != sender_fingerprint {
2946 warn!(%signer, %sender_fingerprint, %room_id, %file_id, "FileOffer signer mismatch; dropping");
2947 return;
2948 }
2949 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2952 .unwrap_or(false)
2953 {
2954 info!(%sender_fingerprint, %room_id, %file_id, "dropping FileOffer from banned peer");
2955 return;
2956 }
2957 self.handle_file_offer(
2958 room_id,
2959 sender_fingerprint,
2960 file_id,
2961 name,
2962 size_bytes,
2963 mime,
2964 chunk_count,
2965 encrypted_meta,
2966 );
2967 }
2968 RoomMessage::FileChunk {
2969 sender_fingerprint,
2970 file_id,
2971 chunk_index,
2972 total_chunks,
2973 data_b64,
2974 } => {
2975 if sender_fingerprint == our_fp {
2976 return;
2977 }
2978 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
2979 .unwrap_or(false)
2980 {
2981 return;
2982 }
2983 self.handle_file_chunk(
2984 room_id,
2985 sender_fingerprint,
2986 file_id,
2987 chunk_index,
2988 total_chunks,
2989 data_b64,
2990 );
2991 }
2992 RoomMessage::OwnerGrant {
2993 room_id: announced_room_id,
2994 target_fingerprint,
2995 } => {
2996 if announced_room_id != room_id {
3001 warn!(payload_room = %announced_room_id, topic_room = %room_id, "OwnerGrant room mismatch");
3002 return;
3003 }
3004 let signer = match verified_signer {
3005 Some(fp) => fp,
3006 None => {
3007 warn!(%room_id, "OwnerGrant arrived unsigned; dropping");
3008 return;
3009 }
3010 };
3011 if !self.is_owner(room_id, &signer) {
3012 warn!(%signer, %room_id, "OwnerGrant signer isn't an owner; dropping");
3013 return;
3014 }
3015 info!(%signer, %target_fingerprint, %room_id, "OwnerGrant applied");
3016 if let Err(e) =
3017 repo::set_member_role(&self.db, room_id, &target_fingerprint, "owner")
3018 {
3019 warn!(%e, "OwnerGrant: set_member_role failed");
3020 }
3021 }
3022 RoomMessage::BanMember {
3023 room_id: announced_room_id,
3024 target_fingerprint,
3025 } => {
3026 if announced_room_id != room_id {
3027 warn!(payload_room = %announced_room_id, topic_room = %room_id, "BanMember room mismatch");
3028 return;
3029 }
3030 let signer = match verified_signer {
3031 Some(fp) => fp,
3032 None => {
3033 warn!(%room_id, "BanMember arrived unsigned; dropping");
3034 return;
3035 }
3036 };
3037 if !self.is_owner(room_id, &signer) {
3038 warn!(%signer, %room_id, "BanMember signer isn't an owner; dropping");
3039 return;
3040 }
3041 if target_fingerprint == our_fp {
3042 info!(%room_id, %signer, "we were kicked from this room");
3048 self.active_rooms.lock().unwrap().remove(room_id);
3049 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
3050 room_id: room_id.to_string(),
3051 });
3052 return;
3053 }
3054 info!(%signer, %target_fingerprint, %room_id, "BanMember applied");
3055 if let Err(e) = repo::add_room_ban(
3056 &self.db,
3057 room_id,
3058 &target_fingerprint,
3059 &signer,
3060 "", now_unix(),
3062 ) {
3063 warn!(%e, "BanMember: add_room_ban failed");
3064 }
3065 self.evict_banned_member(room_id, &target_fingerprint);
3066 }
3067 RoomMessage::SasInit {
3068 tx_id,
3069 ephemeral_x25519_pubkey_b64,
3070 target_fingerprint,
3071 } => {
3072 if target_fingerprint != our_fp {
3073 return;
3078 }
3079 let signer = match verified_signer {
3080 Some(fp) => fp,
3081 None => {
3082 warn!("SasInit arrived unsigned; dropping");
3083 return;
3084 }
3085 };
3086 let their_pub =
3087 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
3088 Ok(pk) => pk,
3089 Err(e) => {
3090 warn!(%e, "SasInit: bad x25519 pubkey");
3091 return;
3092 }
3093 };
3094 let tx_id_bytes = match B64.decode(&tx_id) {
3095 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
3096 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
3097 arr.copy_from_slice(&b);
3098 arr
3099 }
3100 _ => {
3101 warn!(%tx_id, "SasInit: bad tx_id length");
3102 return;
3103 }
3104 };
3105 let (_, our_secret, our_pub) = crate::crypto::sas::new_session();
3106 let sas_code =
3107 crate::crypto::sas::derive_sas_code(&our_secret, &their_pub, &tx_id_bytes);
3108 self.sas_flows.lock().unwrap().insert(
3109 tx_id.clone(),
3110 SasFlow {
3111 room_id: room_id.to_string(),
3112 partner_fingerprint: signer.clone(),
3113 our_secret,
3114 sas_code: Some(sas_code.clone()),
3115 our_confirmed: false,
3116 their_confirmed: false,
3117 finalized: false,
3118 },
3119 );
3120 let response = RoomMessage::SasResponse {
3123 tx_id: tx_id.clone(),
3124 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
3125 };
3126 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
3127 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
3128 self.network
3129 .publish_room_message(room_id.to_string(), bytes)
3130 .await;
3131 }
3132 }
3133 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
3134 room_id: room_id.to_string(),
3135 partner_fingerprint: signer,
3136 tx_id,
3137 emoji_labels: sas_code.emoji_labels(),
3138 decimal: sas_code.decimal,
3139 });
3140 }
3141 RoomMessage::SasResponse {
3142 tx_id,
3143 ephemeral_x25519_pubkey_b64,
3144 } => {
3145 let signer = match verified_signer {
3146 Some(fp) => fp,
3147 None => {
3148 warn!("SasResponse arrived unsigned; dropping");
3149 return;
3150 }
3151 };
3152 let their_pub =
3153 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
3154 Ok(pk) => pk,
3155 Err(e) => {
3156 warn!(%e, "SasResponse: bad x25519 pubkey");
3157 return;
3158 }
3159 };
3160 let tx_id_bytes = match B64.decode(&tx_id) {
3161 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
3162 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
3163 arr.copy_from_slice(&b);
3164 arr
3165 }
3166 _ => return,
3167 };
3168 let emit = {
3169 let mut flows = self.sas_flows.lock().unwrap();
3170 let flow = match flows.get_mut(&tx_id) {
3171 Some(f) => f,
3172 None => {
3173 warn!(%tx_id, "SasResponse for unknown tx_id");
3174 return;
3175 }
3176 };
3177 if flow.partner_fingerprint != signer {
3178 warn!(
3179 expected = %flow.partner_fingerprint, got = %signer,
3180 "SasResponse signer doesn't match flow's partner; dropping"
3181 );
3182 return;
3183 }
3184 let code = crate::crypto::sas::derive_sas_code(
3185 &flow.our_secret,
3186 &their_pub,
3187 &tx_id_bytes,
3188 );
3189 flow.sas_code = Some(code.clone());
3190 code
3191 };
3192 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
3193 room_id: room_id.to_string(),
3194 partner_fingerprint: signer,
3195 tx_id,
3196 emoji_labels: emit.emoji_labels(),
3197 decimal: emit.decimal,
3198 });
3199 }
3200 RoomMessage::CodeJoinRequest {
3201 room_id: announced_room_id,
3202 joiner_x25519_pubkey_b64,
3203 code,
3204 } => {
3205 if announced_room_id != room_id {
3206 return;
3207 }
3208 let joiner_fp = match verified_signer {
3209 Some(fp) => fp,
3210 None => {
3211 warn!("CodeJoinRequest unsigned; dropping");
3212 return;
3213 }
3214 };
3215 let our_fp = self.identity.fingerprint().to_string();
3219 if !self.is_owner(room_id, &our_fp) {
3220 return;
3221 }
3222 let now = now_unix();
3224 let (code_ok, our_session_id, wrap_input) = {
3225 let mut rooms = self.active_rooms.lock().unwrap();
3226 let room = match rooms.get_mut(room_id) {
3227 Some(r) => r,
3228 None => return,
3229 };
3230 if room.passphrase_key.is_none() {
3231 warn!("CodeJoinRequest: no passphrase key locally; can't respond");
3232 return;
3233 }
3234 let original_len = room.issued_codes.len();
3235 room.issued_codes.retain(|(c, exp)| !(c == &code && *exp > now));
3236 let matched = room.issued_codes.len() < original_len;
3237 if !matched {
3238 info!(%joiner_fp, "CodeJoinRequest: code invalid or expired; ignoring");
3239 return;
3240 }
3241 let crypto = room.crypto.as_ref().unwrap();
3242 (
3243 true,
3244 crypto.our_session_id(),
3245 crypto.our_session_key_b64(),
3246 )
3247 };
3248 let _ = code_ok;
3249 let their_pub = match crate::crypto::sas::parse_pubkey(&joiner_x25519_pubkey_b64) {
3251 Ok(pk) => pk,
3252 Err(e) => {
3253 warn!(%e, "CodeJoinRequest: bad pubkey");
3254 return;
3255 }
3256 };
3257 use x25519_dalek::{PublicKey, StaticSecret};
3258 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
3259 let our_pub = PublicKey::from(&our_secret);
3260 let shared = our_secret.diffie_hellman(&their_pub);
3261 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
3263 let mut wrap_key = [0u8; passphrase::KEY_LEN];
3264 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
3265 .expect("32 bytes is within HKDF limits");
3266 let wrapped = match passphrase::wrap(wrap_input.as_bytes(), &wrap_key) {
3269 Ok(w) => w,
3270 Err(e) => {
3271 warn!(%e, "CodeJoinRequest: wrap failed");
3272 return;
3273 }
3274 };
3275 let response = RoomMessage::CodeJoinResponse {
3276 room_id: room_id.to_string(),
3277 target_fingerprint: joiner_fp.clone(),
3278 owner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
3279 owner_session_id: our_session_id,
3280 wrapped_session_key_b64: wrapped,
3281 nonce_b64: String::new(), };
3283 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
3284 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
3285 self.network
3286 .publish_room_message(room_id.to_string(), bytes)
3287 .await;
3288 }
3289 }
3290 info!(%joiner_fp, %room_id, "issued CodeJoinResponse");
3291 }
3292 RoomMessage::CodeJoinResponse {
3293 room_id: announced_room_id,
3294 target_fingerprint,
3295 owner_x25519_pubkey_b64,
3296 owner_session_id,
3297 wrapped_session_key_b64,
3298 nonce_b64: _,
3299 } => {
3300 if announced_room_id != room_id || target_fingerprint != our_fp {
3301 return;
3302 }
3303 let owner_fp = match verified_signer {
3304 Some(fp) => fp,
3305 None => {
3306 warn!("CodeJoinResponse unsigned; dropping");
3307 return;
3308 }
3309 };
3310 let our_secret = match self
3311 .pending_code_secrets
3312 .lock()
3313 .unwrap()
3314 .remove(&(room_id.to_string(), our_fp.clone()))
3315 {
3316 Some(s) => s,
3317 None => {
3318 warn!(%room_id, "CodeJoinResponse with no pending code-join state");
3319 return;
3320 }
3321 };
3322 let owner_pub = match crate::crypto::sas::parse_pubkey(&owner_x25519_pubkey_b64) {
3323 Ok(pk) => pk,
3324 Err(e) => {
3325 warn!(%e, "CodeJoinResponse: bad owner pubkey");
3326 return;
3327 }
3328 };
3329 let shared = our_secret.diffie_hellman(&owner_pub);
3330 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
3331 let mut wrap_key = [0u8; passphrase::KEY_LEN];
3332 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
3333 .expect("32 bytes within HKDF limits");
3334 let session_key_bytes =
3335 match passphrase::unwrap(&wrapped_session_key_b64, &wrap_key) {
3336 Ok(b) => b,
3337 Err(e) => {
3338 warn!(%e, "CodeJoinResponse: unwrap failed");
3339 return;
3340 }
3341 };
3342 let session_key_str = match String::from_utf8(session_key_bytes) {
3343 Ok(s) => s,
3344 Err(e) => {
3345 warn!(%e, "CodeJoinResponse: session key wasn't valid utf8");
3346 return;
3347 }
3348 };
3349 let mut rooms = self.active_rooms.lock().unwrap();
3351 if let Some(room) = rooms.get_mut(room_id) {
3352 if let Some(crypto) = room.crypto.as_mut() {
3353 if let Err(e) =
3354 crypto.add_inbound_session(&owner_fp, &session_key_str)
3355 {
3356 warn!(%e, "CodeJoinResponse: add_inbound_session failed");
3357 } else {
3358 info!(%room_id, %owner_fp, %owner_session_id, "code-join completed; can decrypt owner's messages");
3359 room.members.insert(owner_fp.clone());
3360 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
3361 room_id: room_id.to_string(),
3362 fingerprint: owner_fp,
3363 });
3364 }
3365 }
3366 }
3367 }
3368 RoomMessage::JoinRefused {
3369 room_id: announced_room_id,
3370 target_fingerprint,
3371 reason,
3372 } => {
3373 if announced_room_id != room_id || target_fingerprint != our_fp {
3374 return;
3375 }
3376 let _ = self.app_event_tx.send(AppEvent::Error {
3380 description: format!("join refused: {reason}"),
3381 });
3382 }
3383 RoomMessage::SasConfirm { tx_id, matched } => {
3384 let signer = match verified_signer {
3385 Some(fp) => fp,
3386 None => return,
3387 };
3388 let (room_id_done, partner_fp_done, both_done) = {
3389 let mut flows = self.sas_flows.lock().unwrap();
3390 let flow = match flows.get_mut(&tx_id) {
3391 Some(f) => f,
3392 None => return,
3393 };
3394 if flow.partner_fingerprint != signer {
3395 return;
3396 }
3397 if !matched {
3398 let _ = flow;
3400 flows.remove(&tx_id);
3401 return;
3402 }
3403 flow.their_confirmed = true;
3404 if flow.our_confirmed && flow.their_confirmed && !flow.finalized {
3411 flow.finalized = true;
3412 (
3413 Some(flow.room_id.clone()),
3414 Some(flow.partner_fingerprint.clone()),
3415 true,
3416 )
3417 } else {
3418 (None, None, false)
3419 }
3420 };
3421 if both_done {
3422 if let (Some(rid), Some(pfp)) = (room_id_done, partner_fp_done) {
3423 if let Err(e) = self.finish_sas(&tx_id, &rid, &pfp).await {
3424 warn!(%e, "finish_sas failed");
3425 }
3426 }
3427 }
3428 }
3429 RoomMessage::ProfileUpdate {
3430 sender_fingerprint,
3431 username,
3432 updated_at,
3433 } => {
3434 let signer = match verified_signer {
3440 Some(fp) => fp,
3441 None => {
3442 warn!(
3443 sender = %sender_fingerprint,
3444 "dropping unsigned ProfileUpdate"
3445 );
3446 return;
3447 }
3448 };
3449 if signer != sender_fingerprint {
3450 warn!(
3451 signer = %signer,
3452 claimed = %sender_fingerprint,
3453 "dropping ProfileUpdate with signer != sender"
3454 );
3455 return;
3456 }
3457 if let Err(e) = repo::upsert_peer_profile(
3458 &self.db,
3459 &sender_fingerprint,
3460 username.as_deref(),
3461 updated_at,
3462 ) {
3463 warn!(%e, "upsert_peer_profile failed");
3464 return;
3465 }
3466 let _ = self.app_event_tx.send(AppEvent::PeerProfileUpdated {
3467 fingerprint: sender_fingerprint,
3468 username,
3469 });
3470 }
3471 }
3472 }
3473
3474 pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
3482 let bytes = std::fs::read(path)?;
3483 let name = path
3484 .file_name()
3485 .map(|n| n.to_string_lossy().to_string())
3486 .unwrap_or_else(|| "untitled".into());
3487 let mime = crate::files::guess_mime(&name);
3488 let original_path = path.to_path_buf();
3489
3490 let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
3491 let mut rooms = self.active_rooms.lock().unwrap();
3492 let room = rooms
3493 .get_mut(room_id)
3494 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3495 if room.read_only {
3500 return Err(HuddleError::Other(
3501 "this room is read-only — you can't send files".into(),
3502 ));
3503 }
3504 if room.info.encrypted {
3505 let crypto = room
3506 .crypto
3507 .as_mut()
3508 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
3509 let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
3510 (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
3511 } else {
3512 (false, None, None, bytes)
3513 }
3514 };
3515 let _ = &mut maybe_session_id; let plan =
3518 self.file_manager
3519 .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
3520 let file_id = plan.file_id.clone();
3521 let total = plan.chunks.len() as u32;
3522 let our_fp = self.identity.fingerprint().to_string();
3523
3524 let attachment = StoredAttachment {
3525 id: 0,
3526 room_id: room_id.to_string(),
3527 message_id: None,
3528 sender_fingerprint: our_fp.clone(),
3529 file_id: file_id.clone(),
3530 name: name.clone(),
3531 mime: mime.clone(),
3532 size_bytes: plan.size_bytes as i64,
3533 status: AttachmentStatus::Ready,
3534 cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
3535 saved_path: Some(original_path.to_string_lossy().into()),
3536 error: None,
3537 encrypted: room_encrypted,
3538 wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
3539 nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
3540 megolm_session_id: encrypted_meta_opt
3541 .as_ref()
3542 .map(|m| m.megolm_session_id.clone()),
3543 content_hash: encrypted_meta_opt.as_ref().map(|m| m.content_hash.clone()),
3544 created_at: now_unix(),
3545 };
3546 repo::upsert_attachment(&self.db, &attachment)?;
3547 let _ = self.app_event_tx.send(AppEvent::FileOffered {
3548 room_id: room_id.to_string(),
3549 file_id: file_id.clone(),
3550 name: name.clone(),
3551 size_bytes: plan.size_bytes,
3552 sender_fingerprint: our_fp.clone(),
3553 });
3554
3555 let offer = RoomMessage::FileOffer {
3562 sender_fingerprint: our_fp.clone(),
3563 file_id: file_id.clone(),
3564 name,
3565 size_bytes: plan.size_bytes,
3566 mime,
3567 chunk_count: total,
3568 encrypted_meta: encrypted_meta_opt,
3569 };
3570 if let Ok(env) = crate::crypto::sign_message(&self.identity, &offer) {
3571 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
3572 self.network
3573 .publish_room_message(room_id.to_string(), bytes)
3574 .await;
3575 }
3576 }
3577
3578 let net = self.network.clone();
3581 let room = room_id.to_string();
3582 let our = our_fp.clone();
3583 let fid = file_id.clone();
3584 let chunks = plan.chunks.clone();
3585 tokio::spawn(async move {
3586 for (i, data) in chunks.iter().enumerate() {
3587 let msg = RoomMessage::FileChunk {
3588 sender_fingerprint: our.clone(),
3589 file_id: fid.clone(),
3590 chunk_index: i as u32,
3591 total_chunks: total,
3592 data_b64: B64.encode(data),
3593 };
3594 if let Ok(bytes) = encode_wire(&msg) {
3595 net.publish_room_message(room.clone(), bytes).await;
3596 }
3597 tokio::time::sleep(Duration::from_millis(40)).await;
3598 }
3599 });
3600
3601 Ok(file_id)
3602 }
3603
3604 pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
3607 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
3608 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
3609 if !matches!(
3610 attachment.status,
3611 AttachmentStatus::Ready | AttachmentStatus::Saved
3612 ) {
3613 return Err(HuddleError::Other(format!(
3614 "attachment is not ready (status={})",
3615 attachment.status.as_str()
3616 )));
3617 }
3618 let plaintext = if attachment.encrypted
3623 && attachment.sender_fingerprint == self.identity.fingerprint()
3624 {
3625 match attachment
3626 .saved_path
3627 .as_deref()
3628 .filter(|p| Path::new(p).exists())
3629 {
3630 Some(src) => std::fs::read(src)?,
3631 None => {
3632 return Err(HuddleError::Other(
3633 "your original file has moved or been deleted — it can't be \
3634 recovered from the encrypted cache"
3635 .into(),
3636 ));
3637 }
3638 }
3639 } else {
3640 let cached = self.file_manager.read_cache(file_id)?;
3641 if attachment.encrypted {
3642 let meta = EncryptedFileMeta {
3643 megolm_session_id: attachment
3644 .megolm_session_id
3645 .clone()
3646 .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
3647 wrapped_key_b64: attachment
3648 .wrapped_key
3649 .clone()
3650 .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
3651 nonce_b64: attachment
3652 .nonce
3653 .clone()
3654 .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
3655 content_hash: attachment
3656 .content_hash
3657 .clone()
3658 .ok_or_else(|| HuddleError::Other("missing content_hash".into()))?,
3659 };
3660 self.decrypt_attachment(
3661 room_id,
3662 &attachment.sender_fingerprint,
3663 &cached,
3664 &meta,
3665 )?
3666 } else {
3667 cached
3668 }
3669 };
3670 let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
3671 repo::update_attachment_paths(
3672 &self.db,
3673 room_id,
3674 file_id,
3675 None,
3676 Some(&saved.to_string_lossy()),
3677 )?;
3678 repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
3679 let _ = self.app_event_tx.send(AppEvent::FileSaved {
3680 file_id: file_id.into(),
3681 path: saved.to_string_lossy().into(),
3682 });
3683 Ok(saved)
3684 }
3685
3686 pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
3688 self.file_manager.cancel_incoming(file_id);
3689 repo::update_attachment_status(
3690 &self.db,
3691 room_id,
3692 file_id,
3693 AttachmentStatus::Cancelled,
3694 None,
3695 )?;
3696 Ok(())
3697 }
3698
3699 pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
3701 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
3702 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
3703 let path = attachment
3704 .saved_path
3705 .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
3706 open_with_system(&path)
3707 }
3708
3709 pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
3710 repo::list_room_attachments(&self.db, room_id)
3711 }
3712
3713 pub fn set_member_verified(
3717 &self,
3718 room_id: &str,
3719 fingerprint: &str,
3720 verified: bool,
3721 ) -> Result<()> {
3722 let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
3727 if !members.iter().any(|m| m.fingerprint == fingerprint) {
3728 repo::upsert_room_member(
3729 &self.db,
3730 &StoredRoomMember {
3731 room_id: room_id.to_string(),
3732 peer_id: String::new(),
3733 fingerprint: fingerprint.to_string(),
3734 last_seen: Some(now_unix()),
3735 verified,
3736 ed25519_pubkey: None,
3737 role: "member".into(),
3738 },
3739 )?;
3740 }
3741 repo::set_member_verified(&self.db, room_id, fingerprint, verified)
3742 }
3743
3744 pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
3745 repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
3746 }
3747
3748 pub fn is_owner(&self, room_id: &str, fingerprint: &str) -> bool {
3751 repo::list_room_owners(&self.db, room_id)
3752 .unwrap_or_default()
3753 .iter()
3754 .any(|fp| fp == fingerprint)
3755 }
3756
3757 pub fn we_are_owner(&self, room_id: &str) -> bool {
3758 self.is_owner(room_id, &self.identity.fingerprint().to_string())
3759 }
3760
3761 pub fn room_owners(&self, room_id: &str) -> Vec<String> {
3764 repo::list_room_owners(&self.db, room_id).unwrap_or_default()
3765 }
3766
3767 pub fn has_master_passphrase(&self) -> bool {
3773 self.session_persist_key != [0u8; 32]
3774 }
3775
3776 pub fn verified_only_inbound(&self) -> bool {
3779 repo::get_setting(&self.db, "verified_only_inbound")
3780 .unwrap_or(None)
3781 .map(|v| v == "1")
3782 .unwrap_or(false)
3783 }
3784
3785 pub fn set_verified_only_inbound(&self, on: bool) -> Result<()> {
3786 repo::set_setting(&self.db, "verified_only_inbound", if on { "1" } else { "0" })
3787 }
3788
3789 pub fn mdns_enabled(&self) -> bool {
3799 repo::get_setting(&self.db, "mdns_enabled")
3800 .unwrap_or(None)
3801 .map(|v| v == "1")
3802 .unwrap_or(true)
3803 }
3804
3805 pub fn set_mdns_enabled(&self, on: bool) -> Result<()> {
3806 repo::set_setting(&self.db, "mdns_enabled", if on { "1" } else { "0" })
3807 }
3808
3809 pub fn notifications_enabled(&self) -> bool {
3815 repo::get_setting(&self.db, "notifications_enabled")
3816 .unwrap_or(None)
3817 .map(|v| v == "1")
3818 .unwrap_or(true)
3819 }
3820
3821 pub fn set_notifications_enabled(&self, on: bool) -> Result<()> {
3822 repo::set_setting(
3823 &self.db,
3824 "notifications_enabled",
3825 if on { "1" } else { "0" },
3826 )
3827 }
3828
3829 pub fn safety_code(&self) -> String {
3834 crate::identity::safety_code(&self.identity.public_bytes())
3835 }
3836
3837 pub fn room_verified_only(&self, room_id: &str) -> bool {
3842 repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
3843 }
3844
3845 pub fn set_room_verified_only(&self, room_id: &str, on: bool) -> Result<()> {
3846 repo::set_room_verified_only(&self.db, room_id, on)
3847 }
3848
3849 pub fn onboarding_seen(&self) -> bool {
3851 repo::is_onboarding_seen(&self.db).unwrap_or(true)
3852 }
3853
3854 pub fn mark_onboarding_seen(&self) -> Result<()> {
3855 repo::mark_onboarding_seen(&self.db)
3856 }
3857
3858 pub fn last_seen_onboarding_version(&self) -> Option<String> {
3862 repo::get_last_seen_onboarding_version(&self.db).unwrap_or(None)
3863 }
3864
3865 pub fn set_last_seen_onboarding_version(&self, version: &str) -> Result<()> {
3866 repo::set_last_seen_onboarding_version(&self.db, version)
3867 }
3868
3869 pub fn update_check_enabled(&self) -> Option<bool> {
3872 repo::get_update_check_enabled(&self.db).unwrap_or(None)
3873 }
3874
3875 pub fn set_update_check_enabled(&self, enabled: bool) -> Result<()> {
3876 repo::set_update_check_enabled(&self.db, enabled)
3877 }
3878
3879 pub fn last_update_check_at(&self) -> i64 {
3882 repo::get_setting(&self.db, "last_update_check_at")
3883 .ok()
3884 .flatten()
3885 .and_then(|s| s.parse().ok())
3886 .unwrap_or(0)
3887 }
3888
3889 pub fn set_last_update_check_at(&self, ts: i64) -> Result<()> {
3890 repo::set_setting(&self.db, "last_update_check_at", &ts.to_string())
3891 }
3892
3893 pub fn last_known_remote_version(&self) -> Option<String> {
3897 repo::get_setting(&self.db, "last_known_remote_version")
3898 .ok()
3899 .flatten()
3900 }
3901
3902 pub fn set_last_known_remote_version(&self, v: &str) -> Result<()> {
3903 repo::set_setting(&self.db, "last_known_remote_version", v)
3904 }
3905
3906 pub async fn grant_owner(&self, room_id: &str, target_fingerprint: &str) -> Result<()> {
3910 let our_fp = self.identity.fingerprint().to_string();
3911 if !self.is_owner(room_id, &our_fp) {
3912 return Err(HuddleError::Other(
3913 "only an owner can grant owner".into(),
3914 ));
3915 }
3916 let msg = RoomMessage::OwnerGrant {
3917 room_id: room_id.to_string(),
3918 target_fingerprint: target_fingerprint.to_string(),
3919 };
3920 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3921 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3922 self.network
3923 .publish_room_message(room_id.to_string(), bytes)
3924 .await;
3925 repo::set_member_role(&self.db, room_id, target_fingerprint, "owner")?;
3927 Ok(())
3928 }
3929
3930 pub async fn kick_member(
3941 &self,
3942 room_id: &str,
3943 target_fingerprint: &str,
3944 ) -> Result<String> {
3945 let our_fp = self.identity.fingerprint().to_string();
3946 if !self.is_owner(room_id, &our_fp) {
3947 return Err(HuddleError::Other("only an owner can kick".into()));
3948 }
3949 if target_fingerprint == our_fp {
3950 return Err(HuddleError::Other("can't kick yourself".into()));
3951 }
3952 let info = self
3953 .active_rooms
3954 .lock()
3955 .unwrap()
3956 .get(room_id)
3957 .map(|r| r.info.clone())
3958 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3959 if !info.encrypted {
3960 let msg = RoomMessage::BanMember {
3964 room_id: room_id.to_string(),
3965 target_fingerprint: target_fingerprint.to_string(),
3966 };
3967 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3968 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3969 self.network
3970 .publish_room_message(room_id.to_string(), bytes)
3971 .await;
3972 repo::add_room_ban(
3973 &self.db,
3974 room_id,
3975 target_fingerprint,
3976 &our_fp,
3977 &env.signature_b64,
3978 now_unix(),
3979 )?;
3980 self.evict_banned_member(room_id, target_fingerprint);
3981 return Ok(String::new());
3982 }
3983 let new_passphrase = generate_join_passphrase();
3985 let msg = RoomMessage::BanMember {
3986 room_id: room_id.to_string(),
3987 target_fingerprint: target_fingerprint.to_string(),
3988 };
3989 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3990 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3991 self.network
3992 .publish_room_message(room_id.to_string(), bytes)
3993 .await;
3994 repo::add_room_ban(
3995 &self.db,
3996 room_id,
3997 target_fingerprint,
3998 &our_fp,
3999 &env.signature_b64,
4000 now_unix(),
4001 )?;
4002 self.evict_banned_member(room_id, target_fingerprint);
4003 self.rotate_room(room_id, &new_passphrase).await?;
4006 Ok(new_passphrase)
4007 }
4008
4009 pub fn generate_join_code(&self, room_id: &str) -> Result<String> {
4016 let our_fp = self.identity.fingerprint().to_string();
4017 if !self.is_owner(room_id, &our_fp) {
4018 return Err(HuddleError::Other(
4019 "only an owner can issue join codes".into(),
4020 ));
4021 }
4022 let code = generate_alphanumeric_code(8);
4023 let expires_at = now_unix() + 10 * 60;
4024 let mut rooms = self.active_rooms.lock().unwrap();
4025 let room = rooms
4026 .get_mut(room_id)
4027 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
4028 let now = now_unix();
4030 room.issued_codes.retain(|(_, exp)| *exp > now);
4031 room.issued_codes.push((code.clone(), expires_at));
4032 Ok(code)
4033 }
4034
4035 pub async fn join_room_with_code(
4042 &self,
4043 room_id: &str,
4044 code: &str,
4045 ) -> Result<()> {
4046 let info = {
4048 let d = self.discovered_rooms.lock().unwrap().get(room_id).cloned();
4049 match d {
4050 Some(d) => StoredRoom {
4051 id: room_id.to_string(),
4052 name: d.name,
4053 creator_fingerprint: d.creator_fingerprint,
4054 encrypted: d.encrypted,
4055 passphrase_salt: None, created_at: now_unix(),
4057 last_active: Some(now_unix()),
4058 kind: d.kind,
4061 },
4062 None => {
4063 return Err(HuddleError::Other(format!(
4064 "room {room_id} not visible — wait for an announcement"
4065 )))
4066 }
4067 }
4068 };
4069 if !info.encrypted {
4070 return Err(HuddleError::Other(
4071 "code-join only applies to encrypted rooms".into(),
4072 ));
4073 }
4074 let our_fp = self.identity.fingerprint().to_string();
4075 use x25519_dalek::{PublicKey, StaticSecret};
4078 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
4079 let our_pub = PublicKey::from(&our_secret);
4080 let key = (room_id.to_string(), our_fp.clone());
4085 self.pending_code_secrets
4086 .lock()
4087 .unwrap()
4088 .insert(key.clone(), our_secret);
4089 let map = self.pending_code_secrets.clone();
4094 let tx = self.app_event_tx.clone();
4095 let timeout_room = room_id.to_string();
4096 tokio::spawn(async move {
4097 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
4098 let still_pending = map.lock().unwrap().remove(&key).is_some();
4099 if still_pending {
4100 let _ = tx.send(AppEvent::CodeJoinTimedOut {
4101 room_id: timeout_room,
4102 reason: "no response from owner — code may be wrong or expired".into(),
4103 });
4104 }
4105 });
4106 repo::insert_room(&self.db, &info)?;
4113 self.active_rooms.lock().unwrap().insert(
4116 room_id.to_string(),
4117 ActiveRoom {
4118 info: info.clone(),
4119 crypto: Some(RoomCrypto::new_for_room(
4120 self.db.clone(),
4121 room_id.to_string(),
4122 our_fp.clone(),
4123 self.session_persist_key,
4124 )?),
4125 passphrase_key: None,
4126 members: {
4127 let mut s = HashSet::new();
4128 s.insert(our_fp.clone());
4129 s
4130 },
4131 typers: HashMap::new(),
4132 read_only: true,
4133 issued_codes: Vec::new(),
4134 },
4135 );
4136 self.network.subscribe_room(room_id.to_string()).await;
4137 let req = RoomMessage::CodeJoinRequest {
4139 room_id: room_id.to_string(),
4140 joiner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
4141 code: code.to_string(),
4142 };
4143 let env = crate::crypto::sign_message(&self.identity, &req)?;
4144 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
4145 self.network
4146 .publish_room_message(room_id.to_string(), bytes)
4147 .await;
4148 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
4151 room_id: room_id.to_string(),
4152 });
4153 Ok(())
4154 }
4155
4156 pub async fn sas_start(&self, room_id: &str, target_fingerprint: &str) -> Result<String> {
4162 let (tx_id_bytes, our_secret, our_pub) = crate::crypto::sas::new_session();
4163 let tx_id = B64.encode(tx_id_bytes);
4164 let msg = RoomMessage::SasInit {
4165 tx_id: tx_id.clone(),
4166 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
4167 target_fingerprint: target_fingerprint.to_string(),
4168 };
4169 let env = crate::crypto::sign_message(&self.identity, &msg)?;
4170 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
4171 self.sas_flows.lock().unwrap().insert(
4172 tx_id.clone(),
4173 SasFlow {
4174 room_id: room_id.to_string(),
4175 partner_fingerprint: target_fingerprint.to_string(),
4176 our_secret,
4177 sas_code: None,
4178 our_confirmed: false,
4179 their_confirmed: false,
4180 finalized: false,
4181 },
4182 );
4183 self.network
4184 .publish_room_message(room_id.to_string(), bytes)
4185 .await;
4186 Ok(tx_id)
4187 }
4188
4189 pub async fn sas_match(&self, tx_id: &str) -> Result<()> {
4193 let (room_id, partner_fp, both_done) = {
4194 let mut flows = self.sas_flows.lock().unwrap();
4195 let flow = flows
4196 .get_mut(tx_id)
4197 .ok_or_else(|| HuddleError::Other("unknown SAS tx_id".into()))?;
4198 flow.our_confirmed = true;
4199 let do_finish = flow.our_confirmed && flow.their_confirmed && !flow.finalized;
4203 if do_finish {
4204 flow.finalized = true;
4205 }
4206 (
4207 flow.room_id.clone(),
4208 flow.partner_fingerprint.clone(),
4209 do_finish,
4210 )
4211 };
4212 let msg = RoomMessage::SasConfirm {
4213 tx_id: tx_id.to_string(),
4214 matched: true,
4215 };
4216 let env = crate::crypto::sign_message(&self.identity, &msg)?;
4217 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
4218 self.network
4219 .publish_room_message(room_id.clone(), bytes)
4220 .await;
4221 if both_done {
4222 self.finish_sas(tx_id, &room_id, &partner_fp).await?;
4223 }
4224 Ok(())
4225 }
4226
4227 pub fn sas_cancel(&self, tx_id: &str) {
4231 self.sas_flows.lock().unwrap().remove(tx_id);
4232 }
4233
4234 async fn finish_sas(
4237 &self,
4238 tx_id: &str,
4239 room_id: &str,
4240 partner_fingerprint: &str,
4241 ) -> Result<()> {
4242 repo::set_member_verified(&self.db, room_id, partner_fingerprint, true)?;
4243 repo::add_verified_peer(&self.db, partner_fingerprint, now_unix())?;
4244 self.sas_flows.lock().unwrap().remove(tx_id);
4245 let _ = self.app_event_tx.send(AppEvent::SasVerified {
4246 room_id: room_id.to_string(),
4247 partner_fingerprint: partner_fingerprint.to_string(),
4248 });
4249 Ok(())
4250 }
4251
4252 fn evict_banned_member(&self, room_id: &str, fingerprint: &str) {
4257 if let Some(room) = self.active_rooms.lock().unwrap().get_mut(room_id) {
4258 room.members.remove(fingerprint);
4259 }
4260 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
4261 room_id: room_id.to_string(),
4262 fingerprint: fingerprint.to_string(),
4263 });
4264 }
4265
4266 pub fn display_name(&self) -> Option<String> {
4267 repo::get_display_name(&self.db).unwrap_or(None)
4268 }
4269
4270 pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
4271 repo::set_display_name(&self.db, name)
4272 }
4273
4274 pub async fn set_username(&self, name: Option<&str>) -> Result<()> {
4280 repo::set_display_name(&self.db, name)?;
4281 let msg = RoomMessage::ProfileUpdate {
4282 sender_fingerprint: self.identity.fingerprint().to_string(),
4283 username: name.map(|s| s.to_string()),
4284 updated_at: now_unix_ms(),
4285 };
4286 let env = crate::crypto::sign_message(&self.identity, &msg)?;
4287 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
4288 let rooms: Vec<String> = self.active_rooms.lock().unwrap().keys().cloned().collect();
4289 for room_id in rooms {
4290 self.network
4291 .publish_room_message(room_id, bytes.clone())
4292 .await;
4293 }
4294 Ok(())
4295 }
4296
4297 pub fn lookup_username(&self, fingerprint: &str) -> Option<String> {
4302 repo::get_peer_username(&self.db, fingerprint).unwrap_or(None)
4303 }
4304
4305 pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
4309 self.lookup_username(fingerprint)
4310 }
4311
4312 pub fn peers_with_username(&self, username: &str) -> Vec<String> {
4320 repo::find_peers_by_username(&self.db, username).unwrap_or_default()
4321 }
4322
4323 pub fn is_room_muted(&self, room_id: &str) -> bool {
4324 repo::is_room_muted(&self.db, room_id).unwrap_or(false)
4325 }
4326
4327 pub fn list_room_bans(&self, room_id: &str) -> Vec<String> {
4332 repo::list_room_bans(&self.db, room_id).unwrap_or_default()
4333 }
4334
4335 pub fn list_verified_peers(&self) -> Vec<String> {
4341 repo::list_verified_peers(&self.db).unwrap_or_default()
4342 }
4343
4344 pub fn list_blocked_peers(&self) -> Vec<String> {
4345 repo::list_blocked_peers(&self.db).unwrap_or_default()
4346 }
4347
4348 pub fn unblock_peer(&self, fingerprint: &str) -> Result<()> {
4352 repo::unblock_peer(&self.db, fingerprint)
4353 }
4354
4355 pub fn block_peer(&self, fingerprint: &str) -> Result<()> {
4359 repo::block_peer(&self.db, fingerprint, now_unix())
4360 }
4361
4362 pub fn is_room_read_only(&self, room_id: &str) -> bool {
4368 self.active_rooms
4369 .lock()
4370 .unwrap()
4371 .get(room_id)
4372 .map(|r| r.read_only)
4373 .unwrap_or(false)
4374 }
4375
4376 pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
4377 repo::set_room_muted(&self.db, room_id, muted)
4378 }
4379
4380 pub async fn broadcast_typing(&self, room_id: &str) {
4383 if !self.active_rooms.lock().unwrap().contains_key(room_id) {
4384 return;
4385 }
4386 let msg = RoomMessage::Typing {
4387 sender_fingerprint: self.identity.fingerprint().to_string(),
4388 };
4389 if let Ok(bytes) = encode_wire(&msg) {
4390 self.network
4391 .publish_room_message(room_id.to_string(), bytes)
4392 .await;
4393 }
4394 }
4395
4396 pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
4399 let now = now_unix();
4400 let mut rooms = self.active_rooms.lock().unwrap();
4401 let room = match rooms.get_mut(room_id) {
4402 Some(r) => r,
4403 None => return Vec::new(),
4404 };
4405 room.typers.retain(|_, exp| *exp > now);
4406 let mut v: Vec<String> = room.typers.keys().cloned().collect();
4407 v.sort();
4408 v
4409 }
4410
4411 pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
4421 if new_passphrase.is_empty() {
4422 return Err(HuddleError::Other("new passphrase is empty".into()));
4423 }
4424 let new_salt = passphrase::random_salt();
4425 let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
4426
4427 let info = {
4428 let mut rooms = self.active_rooms.lock().unwrap();
4429 let room = rooms
4430 .get_mut(room_id)
4431 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
4432 if !room.info.encrypted {
4433 return Err(HuddleError::Other(
4434 "rotation only applies to encrypted rooms".into(),
4435 ));
4436 }
4437 let new_crypto = RoomCrypto::new_for_room(
4439 self.db.clone(),
4440 room_id.to_string(),
4441 self.identity.fingerprint().to_string(),
4442 self.session_persist_key,
4443 )?;
4444 room.crypto = Some(new_crypto);
4445 room.passphrase_key = Some(new_key);
4446 room.info.passphrase_salt = Some(new_salt.to_vec());
4447 room.info.clone()
4448 };
4449
4450 let rot = RoomMessage::RotateRoomKey {
4456 rotator_fingerprint: self.identity.fingerprint().to_string(),
4457 new_salt: new_salt.to_vec(),
4458 };
4459 if let Ok(env) = crate::crypto::sign_message(&self.identity, &rot) {
4463 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
4464 self.network
4465 .publish_room_message(room_id.to_string(), bytes)
4466 .await;
4467 }
4468 }
4469 if let Err(e) = self.broadcast_member_announce(room_id).await {
4471 warn!(%e, "rotate: broadcast announce failed");
4472 }
4473
4474 repo::insert_room(&self.db, &info)?;
4476 Ok(())
4477 }
4478
4479 pub async fn accept_rotation(
4483 &self,
4484 room_id: &str,
4485 new_salt: &[u8],
4486 new_passphrase: &str,
4487 ) -> Result<()> {
4488 let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
4489 let info = {
4490 let mut rooms = self.active_rooms.lock().unwrap();
4491 let room = rooms
4492 .get_mut(room_id)
4493 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
4494 room.passphrase_key = Some(new_key);
4495 room.info.passphrase_salt = Some(new_salt.to_vec());
4496 room.info.clone()
4497 };
4498 let req = RoomMessage::SessionKeyRequest {
4502 requester_fingerprint: self.identity.fingerprint().to_string(),
4503 };
4504 if let Ok(bytes) = encode_wire(&req) {
4505 self.network
4506 .publish_room_message(room_id.to_string(), bytes)
4507 .await;
4508 }
4509 repo::insert_room(&self.db, &info)?;
4510 Ok(())
4511 }
4512
4513 #[allow(clippy::too_many_arguments)]
4518 fn handle_file_offer(
4519 &self,
4520 room_id: &str,
4521 sender_fingerprint: String,
4522 file_id: String,
4523 name: String,
4524 size_bytes: u64,
4525 mime: Option<String>,
4526 _chunk_count: u32,
4527 encrypted_meta: Option<EncryptedFileMeta>,
4528 ) {
4529 let encrypted = encrypted_meta.is_some();
4530 let attachment = StoredAttachment {
4531 id: 0,
4532 room_id: room_id.to_string(),
4533 message_id: None,
4534 sender_fingerprint: sender_fingerprint.clone(),
4535 file_id: file_id.clone(),
4536 name: name.clone(),
4537 mime,
4538 size_bytes: size_bytes as i64,
4539 status: AttachmentStatus::Offered,
4540 cache_path: None,
4541 saved_path: None,
4542 error: None,
4543 encrypted,
4544 wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
4545 nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
4546 megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
4547 content_hash: encrypted_meta.as_ref().map(|m| m.content_hash.clone()),
4548 created_at: now_unix(),
4549 };
4550 if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
4551 warn!(%e, "upsert attachment");
4552 return;
4553 }
4554 self.file_manager.set_expected_size(&file_id, size_bytes);
4557 let _ = self.app_event_tx.send(AppEvent::FileOffered {
4558 room_id: room_id.to_string(),
4559 file_id,
4560 name,
4561 size_bytes,
4562 sender_fingerprint,
4563 });
4564 }
4565
4566 fn handle_file_chunk(
4567 &self,
4568 room_id: &str,
4569 _sender_fingerprint: String,
4570 file_id: String,
4571 chunk_index: u32,
4572 total_chunks: u32,
4573 data_b64: String,
4574 ) {
4575 let data = match B64.decode(&data_b64) {
4576 Ok(d) => d,
4577 Err(e) => {
4578 warn!(%e, "bad chunk base64");
4579 return;
4580 }
4581 };
4582 let expected_size = match repo::get_attachment(&self.db, room_id, &file_id) {
4586 Ok(Some(a)) => {
4587 if matches!(
4588 a.status,
4589 AttachmentStatus::Cancelled | AttachmentStatus::Failed
4590 ) {
4591 return;
4592 }
4593 a.size_bytes as u64
4594 }
4595 Ok(None) => crate::files::MAX_FILE_SIZE,
4596 Err(e) => {
4597 warn!(%e, "get attachment for chunk");
4598 crate::files::MAX_FILE_SIZE
4599 }
4600 };
4601
4602 let result = self.file_manager.accept_chunk(
4603 &file_id,
4604 chunk_index,
4605 total_chunks,
4606 data,
4607 expected_size,
4608 );
4609 match result {
4610 Ok(None) => {
4611 let _ = repo::update_attachment_status(
4613 &self.db,
4614 room_id,
4615 &file_id,
4616 AttachmentStatus::Downloading,
4617 None,
4618 );
4619 let bytes_so_far = self
4622 .file_manager
4623 .progress(&file_id)
4624 .map(|(b, _)| b)
4625 .unwrap_or(0);
4626 let _ = self.app_event_tx.send(AppEvent::FileProgress {
4627 file_id: file_id.clone(),
4628 bytes_received: bytes_so_far,
4629 total_bytes: expected_size,
4630 });
4631 }
4632 Ok(Some(completed)) => {
4633 let _ = repo::update_attachment_paths(
4634 &self.db,
4635 room_id,
4636 &file_id,
4637 Some(&completed.cache_path.to_string_lossy()),
4638 None,
4639 );
4640 let _ = repo::update_attachment_status(
4641 &self.db,
4642 room_id,
4643 &file_id,
4644 AttachmentStatus::Ready,
4645 None,
4646 );
4647 let _ = self.app_event_tx.send(AppEvent::FileReady {
4648 file_id: file_id.clone(),
4649 });
4650 }
4651 Err(e) => {
4652 let msg = e.to_string();
4653 warn!(%msg, "chunk processing failed");
4654 let _ = repo::update_attachment_status(
4655 &self.db,
4656 room_id,
4657 &file_id,
4658 AttachmentStatus::Failed,
4659 Some(&msg),
4660 );
4661 let _ = self.app_event_tx.send(AppEvent::FileFailed {
4662 file_id: file_id.clone(),
4663 reason: msg,
4664 });
4665 }
4666 }
4667 }
4668
4669 fn maybe_emit_mention(&self, room_id: &str, body: &str) {
4681 let full = self.identity.fingerprint().to_lowercase();
4682 let short: String = full.chars().filter(|c| c.is_ascii_hexdigit()).take(8).collect();
4685 let lower = body.to_lowercase();
4686 let hit = lower.contains(full.as_str())
4687 || lower
4688 .split(|c: char| !c.is_ascii_hexdigit())
4689 .any(|tok| tok == short);
4690 if hit {
4691 let _ = self.app_event_tx.send(AppEvent::MentionReceived {
4692 room_id: room_id.to_string(),
4693 body: body.to_string(),
4694 });
4695 }
4696 }
4697
4698 fn decrypt_attachment(
4699 &self,
4700 room_id: &str,
4701 sender_fingerprint: &str,
4702 ciphertext: &[u8],
4703 meta: &EncryptedFileMeta,
4704 ) -> Result<Vec<u8>> {
4705 let mut rooms = self.active_rooms.lock().unwrap();
4706 let room = rooms
4707 .get_mut(room_id)
4708 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
4709 let crypto = room
4710 .crypto
4711 .as_mut()
4712 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
4713 file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
4714 }
4715
4716 pub async fn go_dark(&self, master_passphrase: &str) -> Result<()> {
4728 let no_master = self.session_persist_key == [0u8; 32];
4729 if !no_master {
4730 let salt = storage::keychain::load_or_create_salt()?;
4731 let candidate_master =
4732 storage::keychain::derive_master_key(master_passphrase, &salt)?;
4733 let candidate_subkey =
4734 storage::keychain::derive_subkey(&candidate_master, b"megolm-persist");
4735 if !ct_eq_32(&candidate_subkey, &self.session_persist_key) {
4736 return Err(HuddleError::Other(
4737 "incorrect master passphrase".into(),
4738 ));
4739 }
4740 }
4741
4742 let room_ids: Vec<String> = self
4743 .active_rooms
4744 .lock()
4745 .unwrap()
4746 .keys()
4747 .cloned()
4748 .collect();
4749 let _ = tokio::time::timeout(Duration::from_secs(2), async {
4750 for room_id in &room_ids {
4751 if let Err(e) = self.leave_room(room_id).await {
4752 warn!(%room_id, %e, "go_dark: leave_room failed");
4753 }
4754 }
4755 })
4756 .await;
4757
4758 self.network.shutdown().await;
4759 tokio::time::sleep(Duration::from_millis(300)).await;
4760
4761 let data_dir = config::data_dir();
4762 let candidates = [
4763 "huddle.db",
4764 "huddle.db-shm",
4765 "huddle.db-wal",
4766 "keychain.salt",
4767 "huddle.log",
4768 "config.toml",
4769 ];
4770 for name in &candidates {
4771 let path = data_dir.join(name);
4772 wipe_file(&path);
4773 }
4774 if let Ok(read) = std::fs::read_dir(&data_dir) {
4775 for entry in read.flatten() {
4776 if let Some(name) = entry.file_name().to_str() {
4777 if name.starts_with("huddle.log.") {
4778 wipe_file(&entry.path());
4779 }
4780 }
4781 }
4782 }
4783 let files_dir = data_dir.join("files");
4787 if let Ok(read) = std::fs::read_dir(&files_dir) {
4788 for entry in read.flatten() {
4789 let path = entry.path();
4790 if path.is_file() {
4791 wipe_file(&path);
4792 } else if path.is_dir() {
4793 if let Ok(inner) = std::fs::read_dir(&path) {
4796 for inner_entry in inner.flatten() {
4797 if inner_entry.path().is_file() {
4798 wipe_file(&inner_entry.path());
4799 }
4800 }
4801 }
4802 let _ = std::fs::remove_dir(&path);
4803 }
4804 }
4805 }
4806 let _ = std::fs::remove_dir(&files_dir);
4807 let _ = std::fs::remove_dir(&data_dir);
4808
4809 let _ = self.app_event_tx.send(AppEvent::WentDark);
4810 Ok(())
4811 }
4812}
4813
4814pub fn normalize_to_fingerprint(input: &str) -> Option<String> {
4821 let s = input
4822 .trim()
4823 .trim_start_matches("HD-")
4824 .trim_start_matches("hd-")
4825 .to_string();
4826 let hex_only: String = s.chars().filter(|c| *c != '-').collect();
4827 if hex_only.len() != 24 || !hex_only.chars().all(|c| c.is_ascii_hexdigit()) {
4828 return None;
4829 }
4830 let lower = hex_only.to_ascii_lowercase();
4831 let chunks: Vec<String> = lower
4832 .as_bytes()
4833 .chunks(4)
4834 .map(|c| std::str::from_utf8(c).unwrap().to_string())
4835 .collect();
4836 Some(chunks.join("-"))
4837}
4838
4839fn address_preference(addr: &str) -> u8 {
4845 if addr.contains("/p2p-circuit") {
4846 return 9; }
4848 if let Some(rest) = addr.strip_prefix("/ip4/") {
4849 if let Some(ip_str) = rest.split('/').next() {
4850 if let Ok(ip) = ip_str.parse::<std::net::Ipv4Addr>() {
4851 if ip.is_loopback() {
4852 return 1; }
4854 if is_rfc1918(&ip) || ip.is_link_local() {
4855 return 0; }
4857 return 3; }
4859 }
4860 return 3;
4861 }
4862 if addr.starts_with("/ip6/") {
4863 return 4;
4864 }
4865 if addr.starts_with("/dns4/") || addr.starts_with("/dns6/") || addr.starts_with("/dnsaddr/") {
4866 return 5;
4867 }
4868 7
4869}
4870
4871fn is_rfc1918(ip: &std::net::Ipv4Addr) -> bool {
4875 let octets = ip.octets();
4876 octets[0] == 10
4877 || (octets[0] == 172 && (16..=31).contains(&octets[1]))
4878 || (octets[0] == 192 && octets[1] == 168)
4879}
4880
4881fn short_fp_for_msg(fingerprint: &str) -> String {
4885 let head: String = fingerprint
4886 .chars()
4887 .filter(|c| *c != '-')
4888 .take(4)
4889 .collect::<String>()
4890 .to_ascii_uppercase();
4891 format!("HD-{}…", head)
4892}
4893
4894fn ct_eq_32(a: &[u8; 32], b: &[u8; 32]) -> bool {
4898 let mut diff = 0u8;
4899 for i in 0..32 {
4900 diff |= a[i] ^ b[i];
4901 }
4902 diff == 0
4903}
4904
4905fn wipe_file(path: &Path) {
4909 use std::io::Write;
4910 const SCRATCH: usize = 64 * 1024;
4916 if let Ok(meta) = std::fs::metadata(path) {
4917 if let Ok(mut f) = std::fs::OpenOptions::new().write(true).open(path) {
4918 let zeros = [0u8; SCRATCH];
4919 let mut remaining = meta.len();
4920 while remaining > 0 {
4921 let n = remaining.min(SCRATCH as u64) as usize;
4922 if f.write_all(&zeros[..n]).is_err() {
4923 break;
4924 }
4925 remaining -= n as u64;
4926 }
4927 let _ = f.sync_all();
4928 }
4929 }
4930 if let Err(e) = std::fs::remove_file(path) {
4931 if e.kind() != std::io::ErrorKind::NotFound {
4932 warn!(?path, %e, "wipe_file: remove failed");
4933 }
4934 }
4935}
4936
4937fn open_with_system(path: &str) -> Result<()> {
4939 #[cfg(target_os = "macos")]
4940 let cmd = "open";
4941 #[cfg(target_os = "linux")]
4942 let cmd = "xdg-open";
4943 #[cfg(target_os = "windows")]
4944 let cmd = "cmd";
4945 #[cfg(target_os = "windows")]
4946 let args = vec!["/C", "start", "", path];
4947 #[cfg(not(target_os = "windows"))]
4948 let args = vec![path];
4949
4950 std::process::Command::new(cmd)
4951 .args(args)
4952 .spawn()
4953 .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
4954 Ok(())
4955}
4956
4957static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
4960 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
4961
4962pub fn salt_len() -> usize {
4967 SALT_LEN
4968}
4969
4970fn now_unix() -> i64 {
4971 SystemTime::now()
4972 .duration_since(UNIX_EPOCH)
4973 .unwrap()
4974 .as_secs() as i64
4975}
4976
4977fn now_unix_ms() -> i64 {
4978 SystemTime::now()
4979 .duration_since(UNIX_EPOCH)
4980 .unwrap()
4981 .as_millis() as i64
4982}
4983
4984fn generate_join_passphrase() -> String {
4990 use rand::RngCore;
4991 let mut bytes = [0u8; 16];
4992 rand::thread_rng().fill_bytes(&mut bytes);
4993 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
4996}
4997
4998fn generate_alphanumeric_code(len: usize) -> String {
5007 use rand::Rng;
5008 const ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
5009 let mut rng = rand::thread_rng();
5010 let mut out = String::with_capacity(len + 1);
5011 for i in 0..len {
5012 if i == 4 && len == 8 {
5013 out.push('-'); }
5015 let idx = rng.gen_range(0..ALPHABET.len());
5016 out.push(ALPHABET[idx] as char);
5017 }
5018 out
5019}
5020
5021#[cfg(test)]
5022mod parser_tests {
5023 use super::parse_dial_address;
5024
5025 #[test]
5026 fn parses_ipv4_port() {
5027 let m = parse_dial_address("10.3.72.53:9027").unwrap();
5028 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
5029 }
5030
5031 #[test]
5032 fn parses_bracketed_ipv6() {
5033 let m = parse_dial_address("[::1]:9027").unwrap();
5034 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
5035 }
5036
5037 #[test]
5038 fn rejects_unbracketed_ipv6() {
5039 let err = parse_dial_address("fe80::1:9027").unwrap_err();
5040 assert!(err.to_string().contains("brackets"));
5041 }
5042
5043 #[test]
5044 fn passes_through_raw_multiaddr() {
5045 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
5046 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
5047 }
5048
5049 #[test]
5050 fn empty_address_is_error() {
5051 assert!(parse_dial_address(" ").is_err());
5052 }
5053
5054 #[test]
5055 fn rejects_bad_port() {
5056 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
5057 }
5058}
5059
5060#[cfg(test)]
5061mod transport_preference_tests {
5062 use super::{address_preference, normalize_to_fingerprint};
5063
5064 #[test]
5065 fn lan_beats_public_beats_circuit() {
5066 let lan = address_preference("/ip4/192.168.1.5/tcp/9027");
5067 let pub_v4 = address_preference("/ip4/8.8.8.8/tcp/9027");
5068 let circuit = address_preference(
5069 "/ip4/1.2.3.4/tcp/4001/p2p/12D3Koo/p2p-circuit/p2p/12D3KooXYZ",
5070 );
5071 assert!(lan < pub_v4, "LAN {} should beat public {}", lan, pub_v4);
5072 assert!(
5073 pub_v4 < circuit,
5074 "public {} should beat circuit {}",
5075 pub_v4,
5076 circuit
5077 );
5078 }
5079
5080 #[test]
5081 fn all_rfc1918_ranges_are_lan() {
5082 assert_eq!(
5083 address_preference("/ip4/10.0.0.1/tcp/9027"),
5084 address_preference("/ip4/192.168.0.1/tcp/9027"),
5085 );
5086 assert_eq!(
5087 address_preference("/ip4/172.16.0.1/tcp/9027"),
5088 address_preference("/ip4/192.168.0.1/tcp/9027"),
5089 );
5090 assert!(
5092 address_preference("/ip4/172.32.0.1/tcp/9027")
5093 > address_preference("/ip4/172.16.0.1/tcp/9027")
5094 );
5095 }
5096
5097 #[test]
5098 fn normalize_id_accepts_branded_and_raw() {
5099 let canon = "aaaa-bbbb-cccc-dddd-eeee-ffff";
5100 assert_eq!(
5101 normalize_to_fingerprint("HD-AAAA-BBBB-CCCC-DDDD-EEEE-FFFF").as_deref(),
5102 Some(canon)
5103 );
5104 assert_eq!(
5105 normalize_to_fingerprint("aaaabbbbccccddddeeeeffff").as_deref(),
5106 Some(canon)
5107 );
5108 assert_eq!(normalize_to_fingerprint(canon).as_deref(), Some(canon));
5109 assert!(normalize_to_fingerprint("alice").is_none());
5110 assert!(normalize_to_fingerprint("HD-ZZZZ").is_none());
5111 }
5112}
5113
5114#[cfg(test)]
5115mod canonical_dm_room_id_tests {
5116 use super::canonical_dm_room_id;
5117
5118 #[test]
5119 fn dm_room_id_is_commutative() {
5120 let a = "aaaa-bbbb-cccc-dddd-eeee-ffff";
5123 let b = "1111-2222-3333-4444-5555-6666";
5124 assert_eq!(canonical_dm_room_id(a, b), canonical_dm_room_id(b, a));
5125 }
5126
5127 #[test]
5128 fn dm_room_id_differs_per_pair() {
5129 let a = "aaaa-bbbb-cccc-dddd-eeee-ffff";
5130 let b = "1111-2222-3333-4444-5555-6666";
5131 let c = "9999-8888-7777-6666-5555-4444";
5132 assert_ne!(canonical_dm_room_id(a, b), canonical_dm_room_id(a, c));
5133 assert_ne!(canonical_dm_room_id(a, b), canonical_dm_room_id(b, c));
5134 }
5135
5136 #[test]
5137 fn dm_room_id_is_stable() {
5138 let a = "aaaa-bbbb-cccc-dddd-eeee-ffff";
5142 let b = "1111-2222-3333-4444-5555-6666";
5143 let id1 = canonical_dm_room_id(a, b);
5144 let id2 = canonical_dm_room_id(a, b);
5145 assert_eq!(id1, id2);
5146 assert_eq!(id1.len(), 32);
5150 }
5151}