1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{encode_wire, RoomAnnouncement, RoomMessage, WireMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25 self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26 StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36 pub address: String,
37 pub label: Option<String>,
38 pub last_connected_at: Option<i64>,
39 pub connected_peer_id: Option<PeerId>,
40}
41
42pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45 let trimmed = input.trim();
46 if trimmed.is_empty() {
47 return Err(HuddleError::Other("address is empty".into()));
48 }
49 if trimmed.starts_with('/') {
50 return trimmed
51 .parse::<Multiaddr>()
52 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53 }
54 if let Some(rest) = trimmed.strip_prefix('[') {
55 let (host, port) = rest
56 .split_once("]:")
57 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58 let port: u16 = port
59 .parse()
60 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61 return format!("/ip6/{}/tcp/{}", host, port)
62 .parse::<Multiaddr>()
63 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64 }
65 let (host, port) = trimmed
66 .rsplit_once(':')
67 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68 if host.contains(':') {
69 return Err(HuddleError::Other(format!(
70 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71 )));
72 }
73 let port: u16 = port
74 .parse()
75 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76 format!("/ip4/{}/tcp/{}", host, port)
77 .parse::<Multiaddr>()
78 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81struct ActiveRoom {
83 info: StoredRoom,
84 crypto: Option<RoomCrypto>,
85 passphrase_key: Option<[u8; KEY_LEN]>,
88 members: HashSet<String>,
90 typers: HashMap<String, i64>,
93 read_only: bool,
100 issued_codes: Vec<(String, i64)>,
104}
105
106const TYPING_TTL_SECS: i64 = 3;
107
108const DISCOVERED_TTL_SECS: i64 = 45;
111const ANNOUNCE_INTERVAL_SECS: u64 = 15;
112
113struct SasFlow {
117 room_id: String,
118 partner_fingerprint: String,
119 our_secret: x25519_dalek::StaticSecret,
120 sas_code: Option<crate::crypto::sas::SasCode>,
122 our_confirmed: bool,
123 their_confirmed: bool,
124}
125
126#[derive(Clone)]
127pub struct AppHandle {
128 identity: Arc<Identity>,
129 network: NetworkHandle,
130 mode: NetworkMode,
131 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
132 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
133 restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
137 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
140 file_manager: Arc<FileManager>,
142 db: Db,
143 session_persist_key: [u8; 32],
147 sas_flows: Arc<Mutex<HashMap<String, SasFlow>>>,
150 pending_code_secrets:
157 Arc<Mutex<HashMap<(String, String), x25519_dalek::StaticSecret>>>,
158 pending_invite_dials: Arc<Mutex<HashMap<String, String>>>,
171 nat_reachable_addrs: Arc<Mutex<HashSet<String>>>,
177 relay_circuit_addrs: Arc<Mutex<HashSet<String>>>,
183 host_addr_dial_attempts: Arc<Mutex<HashMap<String, i64>>>,
188 last_profile_broadcast_at_ms: Arc<Mutex<HashMap<String, i64>>>,
195 app_event_tx: broadcast::Sender<AppEvent>,
196}
197
198const HOST_ADDR_DIAL_BACKOFF_SECS: i64 = 300;
201
202const PROFILE_REBROADCAST_FLOOR_MS: i64 = 60_000;
206
207impl AppHandle {
208 pub async fn start() -> Result<Self> {
209 Self::start_with_options(NetworkMode::Mdns, 0, None, Vec::new()).await
210 }
211
212 pub async fn start_with_options(
213 mode: NetworkMode,
214 port: u16,
215 master_key: Option<&[u8; 32]>,
216 relays: Vec<Multiaddr>,
217 ) -> Result<Self> {
218 config::ensure_data_dir()?;
219 let session_persist_key = match master_key {
224 Some(mk) => storage::keychain::derive_subkey(mk, b"megolm-persist"),
225 None => [0u8; 32],
226 };
227 let db = storage::open_db(&config::db_path(), master_key)?;
228 Self::start_with_db_and_options(db, mode, port, session_persist_key, relays).await
229 }
230
231 pub async fn start_with_db(db: Db) -> Result<Self> {
232 Self::start_with_db_and_options(db, NetworkMode::Mdns, 0, [0u8; 32], Vec::new()).await
233 }
234
235 pub async fn start_with_db_and_options(
236 db: Db,
237 mode: NetworkMode,
238 port: u16,
239 session_persist_key: [u8; 32],
240 relays: Vec<Multiaddr>,
241 ) -> Result<Self> {
242 let identity = Self::load_or_create_identity(&db)?;
243 let identity = Arc::new(identity);
244 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, relay_count = relays.len(), "identity loaded");
245
246 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
247 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
248 let network =
249 network::start_network_with(&identity, net_event_tx, mode, port, relays)?;
250
251 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
252 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
253 let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
254 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
255 let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
256
257 let handle = Self {
258 identity,
259 network,
260 mode,
261 active_rooms,
262 discovered_rooms,
263 restorable_rooms,
264 connected_dial_addrs,
265 file_manager,
266 db,
267 session_persist_key,
268 sas_flows: Arc::new(Mutex::new(HashMap::new())),
269 pending_code_secrets: Arc::new(Mutex::new(HashMap::new())),
270 pending_invite_dials: Arc::new(Mutex::new(HashMap::new())),
271 nat_reachable_addrs: Arc::new(Mutex::new(HashSet::new())),
272 relay_circuit_addrs: Arc::new(Mutex::new(HashSet::new())),
273 host_addr_dial_attempts: Arc::new(Mutex::new(HashMap::new())),
274 last_profile_broadcast_at_ms: Arc::new(Mutex::new(HashMap::new())),
275 app_event_tx,
276 };
277
278 handle.spawn_event_processor(net_event_rx);
279 handle.spawn_announcement_ticker();
280 handle.spawn_discovered_room_pruner();
281 handle.spawn_known_peer_reconnector();
282 handle.restore_rooms_from_db().await;
283
284 Ok(handle)
285 }
286
287 pub fn mode(&self) -> NetworkMode {
288 self.mode
289 }
290
291 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
292 self.app_event_tx.subscribe()
293 }
294
295 pub fn fingerprint(&self) -> &str {
296 self.identity.fingerprint()
297 }
298
299 pub fn peer_id(&self) -> PeerId {
300 self.identity.peer_id()
301 }
302
303 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
304 let now = now_unix();
305 let mut by_id: HashMap<String, DiscoveredRoom> = self
306 .discovered_rooms
307 .lock()
308 .unwrap()
309 .clone();
310
311 for room in self.active_rooms.lock().unwrap().values() {
315 let entry = DiscoveredRoom {
316 room_id: room.info.id.clone(),
317 name: room.info.name.clone(),
318 encrypted: room.info.encrypted,
319 member_count: room.members.len() as u32,
320 creator_fingerprint: room.info.creator_fingerprint.clone(),
321 last_seen: now,
322 restorable: false,
323 host_addrs: Vec::new(),
324 };
325 by_id
326 .entry(room.info.id.clone())
327 .and_modify(|d| {
328 d.last_seen = now;
329 if entry.member_count > d.member_count {
330 d.member_count = entry.member_count;
331 }
332 d.restorable = false;
333 })
334 .or_insert(entry);
335 }
336
337 for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
341 if by_id.contains_key(id) {
342 continue;
343 }
344 by_id.insert(
345 id.clone(),
346 DiscoveredRoom {
347 room_id: id.clone(),
348 name: stored.name.clone(),
349 encrypted: stored.encrypted,
350 member_count: 0,
351 creator_fingerprint: stored.creator_fingerprint.clone(),
352 last_seen: stored.last_active.unwrap_or(stored.created_at),
353 restorable: true,
354 host_addrs: Vec::new(),
355 },
356 );
357 }
358
359 let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
360 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
361 v
362 }
363
364 pub fn active_room_ids(&self) -> Vec<String> {
365 self.active_rooms.lock().unwrap().keys().cloned().collect()
366 }
367
368 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
369 self.active_rooms
370 .lock()
371 .unwrap()
372 .get(room_id)
373 .map(|r| r.info.clone())
374 }
375
376 pub fn room_members(&self, room_id: &str) -> Vec<String> {
377 self.active_rooms
378 .lock()
379 .unwrap()
380 .get(room_id)
381 .map(|r| {
382 let mut m: Vec<String> = r.members.iter().cloned().collect();
383 m.sort();
384 m
385 })
386 .unwrap_or_default()
387 }
388
389 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
390 repo::get_room_messages(&self.db, room_id, limit)
391 }
392
393 pub fn search_room_messages(
394 &self,
395 room_id: &str,
396 query: &str,
397 limit: i64,
398 ) -> Result<Vec<repo::StoredRoomMessage>> {
399 repo::search_room_messages(&self.db, room_id, query, limit)
400 }
401
402 pub async fn start_room(
404 &self,
405 name: &str,
406 encrypted: bool,
407 passphrase: Option<&str>,
408 ) -> Result<String> {
409 if encrypted && passphrase.is_none() {
410 return Err(HuddleError::Other(
411 "encrypted room requires a passphrase".into(),
412 ));
413 }
414
415 let created_at = now_unix();
416 let creator_fp = self.identity.fingerprint().to_string();
417 let room_id = derive_room_id(&creator_fp, name, created_at);
418
419 let (passphrase_salt, passphrase_key) = if encrypted {
420 let salt = passphrase::random_salt();
421 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
422 (Some(salt.to_vec()), Some(key))
423 } else {
424 (None, None)
425 };
426
427 let info = StoredRoom {
428 id: room_id.clone(),
429 name: name.to_string(),
430 creator_fingerprint: creator_fp.clone(),
431 encrypted,
432 passphrase_salt: passphrase_salt.clone(),
433 created_at,
434 last_active: Some(created_at),
435 };
436 repo::insert_room(&self.db, &info)?;
437
438 let crypto = if encrypted {
439 Some(RoomCrypto::new_for_room(
440 self.db.clone(),
441 room_id.clone(),
442 creator_fp.clone(),
443 self.session_persist_key,
444 )?)
445 } else {
446 None
447 };
448
449 let mut members = HashSet::new();
450 members.insert(creator_fp.clone());
451
452 repo::upsert_room_member(
456 &self.db,
457 &StoredRoomMember {
458 room_id: room_id.clone(),
459 peer_id: String::new(),
460 fingerprint: creator_fp.clone(),
461 last_seen: Some(created_at),
462 verified: true, ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
464 role: "owner".into(),
465 },
466 )?;
467
468 self.active_rooms.lock().unwrap().insert(
469 room_id.clone(),
470 ActiveRoom {
471 info: info.clone(),
472 crypto,
473 passphrase_key,
474 members,
475 typers: HashMap::new(),
476 read_only: false,
477 issued_codes: Vec::new(),
478 },
479 );
480
481 self.network.subscribe_room(room_id.clone()).await;
482 self.announce_room_now(&info, 1).await;
483
484 let app = self.clone();
487 let rid = room_id.clone();
488 tokio::spawn(async move {
489 tokio::time::sleep(Duration::from_millis(500)).await;
490 if let Err(e) = app.broadcast_member_announce(&rid).await {
491 warn!(%e, "broadcast member announce");
492 }
493 });
494
495 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
496 room_id: room_id.clone(),
497 });
498
499 Ok(room_id)
500 }
501
502 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
506 let (name, creator_fingerprint, encrypted, salt_opt) = {
508 if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
509 let salt = self.get_room_salt(room_id);
510 (d.name, d.creator_fingerprint, d.encrypted, salt)
511 } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
512 {
513 (
514 stored.name,
515 stored.creator_fingerprint,
516 stored.encrypted,
517 stored.passphrase_salt,
518 )
519 } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
520 (
521 stored.name,
522 stored.creator_fingerprint,
523 stored.encrypted,
524 stored.passphrase_salt,
525 )
526 } else {
527 return Err(HuddleError::Other(format!("room {room_id} not found")));
528 }
529 };
530
531 if encrypted && passphrase.is_none() {
532 return Err(HuddleError::Other(
533 "encrypted room requires a passphrase".into(),
534 ));
535 }
536
537 let passphrase_key = if encrypted {
538 let salt = salt_opt
539 .clone()
540 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
541 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
542 } else {
543 None
544 };
545
546 let info = StoredRoom {
547 id: room_id.to_string(),
548 name,
549 creator_fingerprint,
550 encrypted,
551 passphrase_salt: salt_opt.clone(),
552 created_at: now_unix(),
553 last_active: Some(now_unix()),
554 };
555 repo::insert_room(&self.db, &info)?;
556
557 let crypto = if encrypted {
558 let our_fp = self.identity.fingerprint().to_string();
561 let existing = RoomCrypto::load(
562 self.db.clone(),
563 room_id.to_string(),
564 our_fp.clone(),
565 self.session_persist_key,
566 )?;
567 Some(match existing {
568 Some(c) => c,
569 None => RoomCrypto::new_for_room(
570 self.db.clone(),
571 room_id.to_string(),
572 our_fp,
573 self.session_persist_key,
574 )?,
575 })
576 } else {
577 None
578 };
579
580 let mut members = HashSet::new();
581 members.insert(self.identity.fingerprint().to_string());
582
583 self.active_rooms.lock().unwrap().insert(
584 room_id.to_string(),
585 ActiveRoom {
586 info: info.clone(),
587 crypto,
588 passphrase_key,
589 members,
590 typers: HashMap::new(),
591 read_only: false,
592 issued_codes: Vec::new(),
593 },
594 );
595 self.restorable_rooms.lock().unwrap().remove(room_id);
597
598 self.network.subscribe_room(room_id.to_string()).await;
599
600 let app = self.clone();
601 let rid = room_id.to_string();
602 tokio::spawn(async move {
603 tokio::time::sleep(Duration::from_millis(500)).await;
604 if let Err(e) = app.broadcast_member_announce(&rid).await {
605 warn!(%e, "broadcast member announce");
606 }
607 let req = RoomMessage::SessionKeyRequest {
609 requester_fingerprint: app.identity.fingerprint().to_string(),
610 };
611 if let Ok(bytes) = encode_wire(&req) {
612 app.network.publish_room_message(rid.clone(), bytes).await;
613 }
614 });
615
616 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
617 room_id: room_id.to_string(),
618 });
619
620 Ok(())
621 }
622
623 async fn restore_rooms_from_db(&self) {
628 let rooms = match repo::list_rooms(&self.db) {
629 Ok(v) => v,
630 Err(e) => {
631 warn!(%e, "list rooms on restore");
632 return;
633 }
634 };
635 let our_fp = self.identity.fingerprint().to_string();
636 let count = rooms.len();
637 for info in rooms {
638 if info.encrypted {
639 self.restorable_rooms
640 .lock()
641 .unwrap()
642 .insert(info.id.clone(), info);
643 continue;
644 }
645 let mut members = HashSet::new();
646 members.insert(our_fp.clone());
647 if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
648 for m in stored_members {
649 members.insert(m.fingerprint);
650 }
651 }
652 let member_count = members.len() as u32;
653 self.active_rooms.lock().unwrap().insert(
654 info.id.clone(),
655 ActiveRoom {
656 info: info.clone(),
657 crypto: None,
658 passphrase_key: None,
659 members,
660 typers: HashMap::new(),
661 read_only: false,
662 issued_codes: Vec::new(),
663 },
664 );
665 self.network.subscribe_room(info.id.clone()).await;
666 self.announce_room_now(&info, member_count).await;
667 info!(room_id = %info.id, name = %info.name, "restored room");
668 }
669 if count > 0 {
670 debug!(count, "restored rooms from db");
671 }
672 }
673
674 pub async fn leave_room(&self, room_id: &str) -> Result<bool> {
679 let leave_msg = RoomMessage::MemberLeave {
681 sender_fingerprint: self.identity.fingerprint().to_string(),
682 };
683 let dispatched = match encode_wire(&leave_msg) {
684 Ok(bytes) => {
685 self.network
686 .publish_room_message(room_id.to_string(), bytes)
687 .await;
688 true
689 }
690 Err(e) => {
691 warn!(%e, %room_id, "failed to encode MemberLeave notice");
692 false
693 }
694 };
695
696 self.active_rooms.lock().unwrap().remove(room_id);
697 self.network.unsubscribe_room(room_id.to_string()).await;
698
699 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
700 room_id: room_id.to_string(),
701 });
702 Ok(dispatched)
703 }
704
705 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
706 let our_fp = self.identity.fingerprint().to_string();
707 let msg = {
708 let mut rooms = self.active_rooms.lock().unwrap();
709 let room = rooms
710 .get_mut(room_id)
711 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
712
713 if room.read_only {
714 return Err(HuddleError::Other(
715 "this room is read-only — you joined via code without the passphrase. Ask an owner for the passphrase or wait for a key rotation that includes you.".into(),
716 ));
717 }
718
719 if room.info.encrypted {
720 let crypto = room
721 .crypto
722 .as_mut()
723 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
724 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
725 RoomMessage::Encrypted {
726 sender_fingerprint: our_fp.clone(),
727 session_id,
728 ciphertext_b64: base64::Engine::encode(
729 &base64::engine::general_purpose::STANDARD,
730 &ct_bytes,
731 ),
732 }
733 } else {
734 RoomMessage::Plain {
735 sender_fingerprint: our_fp.clone(),
736 body: body.to_string(),
737 }
738 }
739 };
740
741 let bytes = encode_wire(&msg)?;
742 self.network
743 .publish_room_message(room_id.to_string(), bytes)
744 .await;
745
746 let now = now_unix();
747 let msg_id =
748 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
749 repo::update_room_last_active(&self.db, room_id, now)?;
750
751 let _ = self.app_event_tx.send(AppEvent::MessageSent {
752 room_id: room_id.to_string(),
753 body: body.to_string(),
754 message_id: msg_id,
755 });
756
757 Ok(())
758 }
759
760 pub async fn shutdown(&self) {
761 self.network.shutdown().await;
762 }
763
764 pub async fn dial_by_id_or_username(&self, input: &str) -> Result<()> {
791 let trimmed = input.trim();
792 if trimmed.is_empty() {
793 return Err(HuddleError::Other("input is empty".into()));
794 }
795 let target_fp = if let Some(fp) = normalize_to_fingerprint(trimmed) {
796 fp
797 } else {
798 let matches = repo::find_peers_by_username(&self.db, trimmed)?;
799 if matches.is_empty() {
800 return Err(HuddleError::Other(format!(
801 "no peer named `{}` known yet — paste their invite link instead",
802 trimmed
803 )));
804 }
805 if matches.len() > 1 {
806 return Err(HuddleError::Other(format!(
807 "username `{}` is ambiguous ({} peers share it) — use their HD- ID instead",
808 trimmed,
809 matches.len()
810 )));
811 }
812 matches.into_iter().next().unwrap()
813 };
814 if target_fp == self.identity.fingerprint() {
815 return Err(HuddleError::Other("that's your own ID".into()));
816 }
817 let candidates = self.resolve_dial_addrs(&target_fp);
818 if candidates.is_empty() {
819 return Err(HuddleError::Other(format!(
820 "haven't seen `{}` on the network yet — ask them for an invite link",
821 short_fp_for_msg(&target_fp)
822 )));
823 }
824 let now = now_unix();
829 for addr in &candidates {
830 let _ = repo::upsert_known_peer(
831 &self.db,
832 &KnownPeer {
833 address: addr.clone(),
834 label: None,
835 last_connected_at: None,
836 last_attempt_at: Some(now),
837 created_at: now,
838 fingerprint: Some(target_fp.clone()),
839 trusted: false,
840 },
841 );
842 }
843 let multiaddrs: Vec<Multiaddr> = candidates
847 .iter()
848 .filter_map(|s| s.parse::<Multiaddr>().ok())
849 .collect();
850 if multiaddrs.is_empty() {
851 return Err(HuddleError::Other(
852 "every known address for that peer is malformed".into(),
853 ));
854 }
855 let _ = self.app_event_tx.send(AppEvent::Dialing {
856 address: candidates[0].clone(),
857 });
858 info!(
859 target_fp = %target_fp,
860 n = multiaddrs.len(),
861 "dialing peer with {} candidate addresses",
862 multiaddrs.len()
863 );
864 self.network.dial_addresses(multiaddrs).await;
865 Ok(())
866 }
867
868 fn resolve_dial_addrs(&self, fingerprint: &str) -> Vec<String> {
876 let mut set: std::collections::HashSet<String> = std::collections::HashSet::new();
877 for room in self.discovered_rooms.lock().unwrap().values() {
878 if room.creator_fingerprint == fingerprint {
879 for addr in &room.host_addrs {
880 set.insert(addr.clone());
881 }
882 }
883 }
884 if let Ok(known) = repo::list_known_peers(&self.db) {
885 for peer in known {
886 if peer.fingerprint.as_deref() == Some(fingerprint) {
887 set.insert(peer.address);
888 }
889 }
890 }
891 let mut v: Vec<String> = set.into_iter().collect();
892 v.sort_by_key(|a| address_preference(a));
893 v
894 }
895
896 pub async fn dial(&self, input: &str) -> Result<()> {
897 let multiaddr = parse_dial_address(input)?;
898 let canonical = multiaddr.to_string();
899 info!(%canonical, "dialing");
900
901 repo::upsert_known_peer(
902 &self.db,
903 &KnownPeer {
904 address: canonical.clone(),
905 label: None,
906 last_connected_at: None,
907 last_attempt_at: Some(now_unix()),
908 created_at: now_unix(),
909 fingerprint: None,
913 trusted: false,
914 },
915 )?;
916
917 let _ = self.app_event_tx.send(AppEvent::Dialing {
918 address: canonical.clone(),
919 });
920 self.network.dial(multiaddr).await;
921 Ok(())
922 }
923
924 pub fn nat_reachable_addrs(&self) -> Vec<String> {
929 self.nat_reachable_addrs
930 .lock()
931 .unwrap()
932 .iter()
933 .cloned()
934 .collect()
935 }
936
937 pub fn dialable_addrs(&self) -> Vec<String> {
945 let mut out: Vec<String> = self
946 .relay_circuit_addrs
947 .lock()
948 .unwrap()
949 .iter()
950 .cloned()
951 .collect();
952 for a in self.nat_reachable_addrs.lock().unwrap().iter() {
953 if !out.contains(a) {
954 out.push(a.clone());
955 }
956 }
957 out.truncate(4);
958 out
959 }
960
961 pub async fn dial_invite(&self, address: &str, claimed_fp: &str) -> Result<()> {
974 let multiaddr = parse_dial_address(address)?;
975 let canonical = multiaddr.to_string();
976 self.pending_invite_dials
977 .lock()
978 .unwrap()
979 .insert(canonical.clone(), claimed_fp.to_string());
980 self.dial(address).await
983 }
984
985 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
986 let connected = self.connected_dial_addrs.lock().unwrap().clone();
987 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
988 stored
989 .into_iter()
990 .map(|p| {
991 let connected_peer = connected.get(&p.address).copied();
992 KnownPeerStatus {
993 address: p.address,
994 label: p.label,
995 last_connected_at: p.last_connected_at,
996 connected_peer_id: connected_peer,
997 }
998 })
999 .collect()
1000 }
1001
1002 pub async fn forget_peer(&self, address: &str) -> Result<()> {
1003 repo::forget_known_peer(&self.db, address)?;
1004 self.connected_dial_addrs.lock().unwrap().remove(address);
1005 Ok(())
1006 }
1007
1008 pub async fn redial(&self, address: &str) -> Result<()> {
1010 self.dial(address).await
1011 }
1012
1013 pub async fn accept_inbound(&self, peer_id: PeerId, address: &str) {
1018 self.network.accept_inbound(peer_id).await;
1019 self.connected_dial_addrs
1020 .lock()
1021 .unwrap()
1022 .insert(address.to_string(), peer_id);
1023 }
1024
1025 pub async fn reject_inbound(&self, peer_id: PeerId, fingerprint: &str) -> Result<()> {
1030 self.network.reject_inbound(peer_id).await;
1031 repo::block_peer(&self.db, fingerprint, now_unix())?;
1032 Ok(())
1033 }
1034
1035 pub async fn trust_inbound(
1038 &self,
1039 peer_id: PeerId,
1040 fingerprint: &str,
1041 address: &str,
1042 ) -> Result<()> {
1043 self.network.accept_inbound(peer_id).await;
1044 self.connected_dial_addrs
1045 .lock()
1046 .unwrap()
1047 .insert(address.to_string(), peer_id);
1048 repo::upsert_known_peer(
1052 &self.db,
1053 &KnownPeer {
1054 address: address.to_string(),
1055 label: None,
1056 last_connected_at: Some(now_unix()),
1057 last_attempt_at: Some(now_unix()),
1058 created_at: now_unix(),
1059 fingerprint: Some(fingerprint.to_string()),
1060 trusted: true,
1061 },
1062 )?;
1063 Ok(())
1064 }
1065
1066 fn spawn_known_peer_reconnector(&self) {
1067 let handle = self.clone();
1068 tokio::spawn(async move {
1069 tokio::time::sleep(Duration::from_millis(500)).await;
1071 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
1072 for (i, peer) in known.into_iter().enumerate() {
1076 let handle = handle.clone();
1077 tokio::spawn(async move {
1078 let jitter = (peer.address.len() as u64 * 37) % 200;
1081 tokio::time::sleep(Duration::from_millis(150 * i as u64 + jitter)).await;
1082 if let Err(e) = handle.dial(&peer.address).await {
1083 debug!(%e, addr = %peer.address, "auto-reconnect failed");
1084 }
1085 });
1086 }
1087 });
1088 }
1089
1090 fn load_or_create_identity(db: &Db) -> Result<Identity> {
1095 if let Some(stored) = repo::load_identity(db)? {
1096 let mut bytes = [0u8; 32];
1097 bytes.copy_from_slice(&stored.ed25519_secret);
1098 Identity::from_secret_bytes(bytes)
1099 } else {
1100 let id = Identity::generate()?;
1101 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
1102 Ok(id)
1103 }
1104 }
1105
1106 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
1107 self.active_rooms
1108 .lock()
1109 .unwrap()
1110 .get(room_id)
1111 .and_then(|r| r.info.passphrase_salt.clone())
1112 .or_else(|| {
1113 ROOM_SALT_CACHE
1115 .lock()
1116 .unwrap()
1117 .get(room_id)
1118 .cloned()
1119 })
1120 }
1121
1122 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
1123 let owner_fingerprints =
1124 repo::list_room_owners(&self.db, &info.id).unwrap_or_default();
1125 let verified_only = repo::get_room_verified_only(&self.db, &info.id).unwrap_or(false);
1126 let host_addrs = self.dialable_addrs();
1127 let ann = RoomAnnouncement {
1128 room_id: info.id.clone(),
1129 name: info.name.clone(),
1130 encrypted: info.encrypted,
1131 passphrase_salt: info.passphrase_salt.clone(),
1132 member_count,
1133 creator_fingerprint: info.creator_fingerprint.clone(),
1134 announced_at: now_unix(),
1135 owner_fingerprints,
1136 verified_only,
1137 host_addrs,
1138 };
1139 self.network.announce_room(ann).await;
1140 }
1141
1142 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
1143 let our_fp = self.identity.fingerprint().to_string();
1144 let wrapped = {
1145 let mut rooms = self.active_rooms.lock().unwrap();
1146 let room = rooms
1147 .get_mut(room_id)
1148 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
1149 if room.info.encrypted {
1150 let crypto = room.crypto.as_mut().unwrap();
1151 let session_key = crypto.our_session_key_b64();
1152 let passphrase_key = room
1153 .passphrase_key
1154 .as_ref()
1155 .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
1156 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
1157 } else {
1158 None
1159 }
1160 };
1161 let display_name = repo::get_display_name(&self.db).unwrap_or(None);
1162 let msg = RoomMessage::MemberAnnounce {
1163 sender_fingerprint: our_fp,
1164 wrapped_session_key: wrapped,
1165 display_name,
1166 sender_ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
1167 };
1168 let bytes = encode_wire(&msg)?;
1169 self.network
1170 .publish_room_message(room_id.to_string(), bytes)
1171 .await;
1172 Ok(())
1173 }
1174
1175 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
1176 let handle = self.clone();
1177 tokio::spawn(async move {
1178 while let Some(event) = net_rx.recv().await {
1179 handle.process_network_event(event).await;
1180 }
1181 info!("event processor stopped");
1182 });
1183 }
1184
1185 fn spawn_announcement_ticker(&self) {
1186 let handle = self.clone();
1187 tokio::spawn(async move {
1188 let mut interval =
1189 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
1190 interval.tick().await; loop {
1192 interval.tick().await;
1193 let snapshot: Vec<(StoredRoom, u32)> = {
1194 let active = handle.active_rooms.lock().unwrap();
1195 active
1196 .values()
1197 .map(|r| (r.info.clone(), r.members.len() as u32))
1198 .collect()
1199 };
1200 for (info, member_count) in snapshot {
1201 handle.announce_room_now(&info, member_count).await;
1202 }
1203 }
1204 });
1205 }
1206
1207 fn spawn_discovered_room_pruner(&self) {
1208 let handle = self.clone();
1209 tokio::spawn(async move {
1210 let mut interval = tokio::time::interval(Duration::from_secs(10));
1211 interval.tick().await;
1212 loop {
1213 interval.tick().await;
1214 let now = now_unix();
1215 let mut to_drop = Vec::new();
1216 {
1217 let mut map = handle.discovered_rooms.lock().unwrap();
1218 map.retain(|id, r| {
1219 if now - r.last_seen > DISCOVERED_TTL_SECS {
1220 to_drop.push(id.clone());
1221 false
1222 } else {
1223 true
1224 }
1225 });
1226 }
1227 for id in to_drop {
1228 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
1229 }
1230 }
1231 });
1232 }
1233
1234 async fn process_network_event(&self, event: NetworkEvent) {
1235 match event {
1236 NetworkEvent::PeerDiscovered { peer_id } => {
1237 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
1238 }
1239 NetworkEvent::PeerExpired { peer_id } => {
1240 self.connected_dial_addrs
1246 .lock()
1247 .unwrap()
1248 .retain(|_addr, pid| *pid != peer_id);
1249 let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
1250 }
1251 NetworkEvent::ListeningOn { address } => {
1252 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1253 address: address.to_string(),
1254 });
1255 }
1256 NetworkEvent::RoomAnnouncementReceived(ann) => {
1257 if let Some(salt) = &ann.passphrase_salt {
1259 ROOM_SALT_CACHE
1260 .lock()
1261 .unwrap()
1262 .insert(ann.room_id.clone(), salt.clone());
1263 }
1264 let our_fp_for_dial = self.identity.fingerprint().to_string();
1269 if ann.creator_fingerprint != our_fp_for_dial && !ann.host_addrs.is_empty() {
1270 let now = now_unix();
1271 let should_dial = {
1272 let mut attempts = self.host_addr_dial_attempts.lock().unwrap();
1273 match attempts.get(&ann.creator_fingerprint).copied() {
1274 Some(last) if now - last < HOST_ADDR_DIAL_BACKOFF_SECS => false,
1275 _ => {
1276 attempts.insert(ann.creator_fingerprint.clone(), now);
1277 true
1278 }
1279 }
1280 };
1281 if should_dial {
1282 if let Some(first) = ann.host_addrs.first() {
1283 info!(
1284 announcer = %ann.creator_fingerprint,
1285 addr = %first,
1286 "opportunistic dial via room announcement host_addrs"
1287 );
1288 let _ = self.dial(first).await;
1291 }
1292 }
1293 }
1294 let discovered = DiscoveredRoom {
1295 room_id: ann.room_id.clone(),
1296 name: ann.name.clone(),
1297 encrypted: ann.encrypted,
1298 member_count: ann.member_count,
1299 creator_fingerprint: ann.creator_fingerprint.clone(),
1300 last_seen: now_unix(),
1301 restorable: false,
1302 host_addrs: ann.host_addrs.clone(),
1303 };
1304 if self.active_rooms.lock().unwrap().contains_key(&ann.room_id) {
1309 self.discovered_rooms
1310 .lock()
1311 .unwrap()
1312 .insert(ann.room_id.clone(), discovered);
1313 return;
1314 }
1315 self.discovered_rooms
1316 .lock()
1317 .unwrap()
1318 .insert(ann.room_id.clone(), discovered.clone());
1319 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
1320 }
1321 NetworkEvent::RoomMessageReceived {
1322 room_id,
1323 payload,
1324 from_peer: _,
1325 } => {
1326 let wire: WireMessage = match serde_json::from_slice(&payload) {
1333 Ok(w) => w,
1334 Err(e) => {
1335 warn!(%e, "bad wire envelope");
1336 return;
1337 }
1338 };
1339 let (msg, verified_signer) = match wire {
1340 WireMessage::Plain(m) => (m, None),
1341 WireMessage::Signed(env) => {
1342 let claimed_pubkey = env.ed25519_pubkey_b64.clone();
1343 match crate::crypto::verify_signed(&env) {
1344 Ok((m, fp)) => {
1345 match repo::get_member_ed25519_pubkey(
1352 &self.db, &room_id, &fp,
1353 ) {
1354 Ok(Some(known)) if known != claimed_pubkey => {
1355 warn!(
1356 %fp, %room_id,
1357 "pubkey mismatch vs stored; dropping signed message"
1358 );
1359 return;
1360 }
1361 _ => {}
1362 }
1363 (m, Some(fp))
1364 }
1365 Err(e) => {
1366 warn!(%e, fp = %env.fingerprint, "signed envelope verify failed");
1367 return;
1368 }
1369 }
1370 }
1371 };
1372 self.handle_room_message(&room_id, msg, verified_signer).await;
1373 }
1374 NetworkEvent::DialSucceeded { peer_id, address } => {
1375 let addr_s = address.to_string();
1376 self.connected_dial_addrs
1377 .lock()
1378 .unwrap()
1379 .insert(addr_s.clone(), peer_id);
1380 let _ = repo::upsert_known_peer(
1384 &self.db,
1385 &KnownPeer {
1386 address: addr_s.clone(),
1387 label: None,
1388 last_connected_at: Some(now_unix()),
1389 last_attempt_at: Some(now_unix()),
1390 created_at: now_unix(),
1391 fingerprint: None,
1392 trusted: false,
1393 },
1394 );
1395 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
1396 address: addr_s,
1397 peer_id,
1398 });
1399 }
1400 NetworkEvent::DialFailed { address, error } => {
1401 let addr_s = address.to_string();
1402 let _ = self.app_event_tx.send(AppEvent::DialFailed {
1403 address: addr_s,
1404 error,
1405 });
1406 }
1407 NetworkEvent::PeerIdentified { peer_id, fingerprint } => {
1408 let matched_addrs: Vec<String> = {
1414 let map = self.connected_dial_addrs.lock().unwrap();
1415 map.iter()
1416 .filter_map(|(addr, pid)| {
1417 if *pid == peer_id {
1418 Some(addr.clone())
1419 } else {
1420 None
1421 }
1422 })
1423 .collect()
1424 };
1425 let mismatch = {
1435 let mut map = self.pending_invite_dials.lock().unwrap();
1436 let mut found: Option<(String, String)> = None;
1437 for addr in &matched_addrs {
1438 if let Some(claimed) = map.remove(addr) {
1439 if claimed != fingerprint {
1440 found = Some((addr.clone(), claimed));
1441 break;
1442 }
1443 }
1444 }
1445 found
1446 };
1447 if let Some((addr, claimed)) = mismatch {
1448 warn!(
1449 %addr, %claimed, actual=%fingerprint,
1450 "invite fingerprint mismatch — disconnecting"
1451 );
1452 self.network.disconnect_peer(peer_id).await;
1453 let _ = self.app_event_tx.send(AppEvent::InviteFingerprintMismatch {
1454 address: addr,
1455 claimed,
1456 actual: fingerprint.clone(),
1457 });
1458 return;
1459 }
1460 for addr in matched_addrs {
1461 let _ = repo::upsert_known_peer(
1462 &self.db,
1463 &KnownPeer {
1464 address: addr,
1465 label: None,
1466 last_connected_at: Some(now_unix()),
1467 last_attempt_at: Some(now_unix()),
1468 created_at: now_unix(),
1469 fingerprint: Some(fingerprint.clone()),
1470 trusted: true,
1471 },
1472 );
1473 }
1474 let our_username = repo::get_display_name(&self.db).unwrap_or(None);
1482 if our_username.is_some() {
1483 let now_ms = now_unix_ms();
1484 let should_send = {
1485 let mut last = self.last_profile_broadcast_at_ms.lock().unwrap();
1486 match last.get(&fingerprint) {
1487 Some(prev) if now_ms - prev < PROFILE_REBROADCAST_FLOOR_MS => false,
1488 _ => {
1489 last.insert(fingerprint.clone(), now_ms);
1490 true
1491 }
1492 }
1493 };
1494 if should_send {
1495 let msg = RoomMessage::ProfileUpdate {
1496 sender_fingerprint: self.identity.fingerprint().to_string(),
1497 username: our_username,
1498 updated_at: now_ms,
1499 };
1500 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1501 if let Ok(bytes) =
1502 crate::network::protocol::encode_wire_signed(&env)
1503 {
1504 let rooms: Vec<String> = self
1505 .active_rooms
1506 .lock()
1507 .unwrap()
1508 .keys()
1509 .cloned()
1510 .collect();
1511 for room_id in rooms {
1512 self.network
1513 .publish_room_message(room_id, bytes.clone())
1514 .await;
1515 }
1516 }
1517 }
1518 }
1519 }
1520 }
1521 NetworkEvent::RelayReservationEstablished { address } => {
1522 info!(addr = %address, "relay reservation established");
1527 self.relay_circuit_addrs
1528 .lock()
1529 .unwrap()
1530 .insert(address.to_string());
1531 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1532 address: address.to_string(),
1533 });
1534 }
1535 NetworkEvent::NatProbeResult {
1536 tested_addr,
1537 reachable,
1538 } => {
1539 let addr_s = tested_addr.to_string();
1540 let (transitioned, becomes_reachable) = {
1541 let mut set = self.nat_reachable_addrs.lock().unwrap();
1542 let was_empty = set.is_empty();
1543 if reachable {
1544 set.insert(addr_s.clone());
1545 } else {
1546 set.remove(&addr_s);
1547 }
1548 let is_empty = set.is_empty();
1549 (was_empty != is_empty, !is_empty)
1550 };
1551 if transitioned {
1552 let label = if becomes_reachable {
1553 "reachable".to_string()
1554 } else {
1555 "private".to_string()
1556 };
1557 info!(reachable = %becomes_reachable, "NAT reachability changed");
1558 let _ = self.app_event_tx.send(AppEvent::NatStatusChanged {
1559 label,
1560 reachable: becomes_reachable,
1561 });
1562 }
1563 }
1564 NetworkEvent::DcutrUpgrade {
1565 remote_peer,
1566 success,
1567 } => {
1568 if success {
1569 let s = remote_peer.to_base58();
1573 let tail: String = s.chars().rev().take(8).collect::<String>()
1574 .chars()
1575 .rev()
1576 .collect();
1577 let _ = self.app_event_tx.send(AppEvent::DcutrSucceeded {
1578 peer_label: tail,
1579 });
1580 }
1581 }
1582 NetworkEvent::InboundDial {
1583 peer_id,
1584 fingerprint,
1585 address,
1586 } => {
1587 if repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false) {
1589 info!(%fingerprint, "inbound dial auto-rejected: peer is blocked");
1590 self.network.reject_inbound(peer_id).await;
1591 return;
1592 }
1593 let global_verified_only =
1598 repo::get_setting(&self.db, "verified_only_inbound")
1599 .ok()
1600 .flatten()
1601 .map(|v| v == "1")
1602 .unwrap_or(false);
1603 if global_verified_only {
1604 let is_verified =
1605 repo::is_globally_verified(&self.db, &fingerprint).unwrap_or(false)
1606 || repo::is_fingerprint_trusted(&self.db, &fingerprint)
1607 .unwrap_or(false);
1608 if !is_verified {
1609 info!(
1610 %fingerprint,
1611 "inbound dial auto-rejected: verified-only mode"
1612 );
1613 self.network.reject_inbound(peer_id).await;
1614 return;
1615 }
1616 }
1617 if repo::is_fingerprint_trusted(&self.db, &fingerprint).unwrap_or(false) {
1618 info!(%fingerprint, "inbound dial auto-accepted: peer is trusted");
1619 self.connected_dial_addrs
1622 .lock()
1623 .unwrap()
1624 .insert(address.to_string(), peer_id);
1625 let _ = repo::upsert_known_peer(
1626 &self.db,
1627 &KnownPeer {
1628 address: address.to_string(),
1629 label: None,
1630 last_connected_at: Some(now_unix()),
1631 last_attempt_at: Some(now_unix()),
1632 created_at: now_unix(),
1633 fingerprint: Some(fingerprint),
1634 trusted: true,
1635 },
1636 );
1637 self.network.accept_inbound(peer_id).await;
1638 return;
1639 }
1640 let _ = self.app_event_tx.send(AppEvent::InboundDial {
1642 peer_id,
1643 fingerprint,
1644 address: address.to_string(),
1645 });
1646 }
1647 }
1648 }
1649
1650 async fn handle_room_message(
1656 &self,
1657 room_id: &str,
1658 msg: RoomMessage,
1659 verified_signer: Option<String>,
1660 ) {
1661 let our_fp = self.identity.fingerprint().to_string();
1662 match msg {
1663 RoomMessage::MemberAnnounce {
1664 sender_fingerprint,
1665 wrapped_session_key,
1666 display_name,
1667 sender_ed25519_pubkey,
1668 } => {
1669 if sender_fingerprint == our_fp {
1670 return;
1671 }
1672 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
1675 .unwrap_or(false)
1676 {
1677 info!(%sender_fingerprint, %room_id, "dropping MemberAnnounce from banned peer");
1678 return;
1679 }
1680 if repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
1687 && !repo::is_globally_verified(&self.db, &sender_fingerprint).unwrap_or(false)
1688 {
1689 info!(
1690 %sender_fingerprint, %room_id,
1691 "dropping MemberAnnounce: room is verified-only and joiner isn't verified"
1692 );
1693 let owners = repo::list_room_owners(&self.db, room_id).unwrap_or_default();
1694 let lowest_owner = owners.iter().min().cloned();
1695 if lowest_owner.as_deref() == Some(&our_fp) {
1696 let msg = RoomMessage::JoinRefused {
1697 room_id: room_id.to_string(),
1698 target_fingerprint: sender_fingerprint.clone(),
1699 reason: "room requires SAS verification — ask an existing member to verify you".into(),
1700 };
1701 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1702 if let Ok(bytes) =
1703 crate::network::protocol::encode_wire_signed(&env)
1704 {
1705 self.network
1706 .publish_room_message(room_id.to_string(), bytes)
1707 .await;
1708 }
1709 }
1710 }
1711 return;
1712 }
1713 let need_inbound = {
1714 let mut rooms = self.active_rooms.lock().unwrap();
1715 let room = match rooms.get_mut(room_id) {
1716 Some(r) => r,
1717 None => return,
1718 };
1719 let newly_added = room.members.insert(sender_fingerprint.clone());
1720 if newly_added {
1721 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1722 room_id: room_id.to_string(),
1723 fingerprint: sender_fingerprint.clone(),
1724 });
1725 }
1726 let _ = repo::upsert_room_member(
1731 &self.db,
1732 &StoredRoomMember {
1733 room_id: room_id.to_string(),
1734 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
1736 last_seen: Some(now_unix()),
1737 verified: false,
1738 ed25519_pubkey: sender_ed25519_pubkey.clone(),
1739 role: "member".into(),
1745 },
1746 );
1747 if let Some(name) = display_name.as_deref() {
1748 let _ = repo::set_member_display_name(
1749 &self.db,
1750 room_id,
1751 &sender_fingerprint,
1752 Some(name),
1753 );
1754 }
1755 room.info.encrypted && wrapped_session_key.is_some()
1756 };
1757
1758 if need_inbound {
1759 let wrapped = wrapped_session_key.unwrap();
1760 let result = {
1761 let mut rooms = self.active_rooms.lock().unwrap();
1762 let room = rooms.get_mut(room_id).unwrap();
1763 let passphrase_key = match &room.passphrase_key {
1764 Some(k) => k,
1765 None => {
1766 warn!("no passphrase key when receiving session key");
1767 return;
1768 }
1769 };
1770 match passphrase::unwrap(&wrapped, passphrase_key) {
1771 Ok(plain) => match String::from_utf8(plain) {
1772 Ok(key_b64) => {
1773 let crypto = room.crypto.as_mut().unwrap();
1774 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
1775 }
1776 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
1777 },
1778 Err(e) => Err(e),
1779 }
1780 };
1781 if let Err(e) = result {
1782 error!(%e, "add inbound session failed");
1783 }
1784 }
1785 }
1786 RoomMessage::SessionKeyRequest {
1787 requester_fingerprint,
1788 } => {
1789 if requester_fingerprint == our_fp {
1790 return;
1791 }
1792 if let Err(e) = self.broadcast_member_announce(room_id).await {
1794 warn!(%e, "broadcast member announce on request");
1795 }
1796 }
1797 RoomMessage::Encrypted {
1798 sender_fingerprint,
1799 session_id,
1800 ciphertext_b64,
1801 } => {
1802 if sender_fingerprint == our_fp {
1803 return;
1804 }
1805 let ct_bytes = match base64::Engine::decode(
1806 &base64::engine::general_purpose::STANDARD,
1807 &ciphertext_b64,
1808 ) {
1809 Ok(b) => b,
1810 Err(e) => {
1811 warn!(%e, "bad base64 ciphertext");
1812 return;
1813 }
1814 };
1815 let plaintext = {
1816 let mut rooms = self.active_rooms.lock().unwrap();
1817 let room = match rooms.get_mut(room_id) {
1818 Some(r) => r,
1819 None => return,
1820 };
1821 let crypto = match room.crypto.as_mut() {
1822 Some(c) => c,
1823 None => return,
1824 };
1825 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1826 };
1827 match plaintext {
1828 Ok(pt) => {
1829 let body = String::from_utf8_lossy(&pt).to_string();
1830 let sent_at = now_unix();
1831 let _ = repo::insert_room_message(
1832 &self.db,
1833 room_id,
1834 &sender_fingerprint,
1835 "in",
1836 &body,
1837 sent_at,
1838 );
1839 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1840 self.maybe_emit_mention(room_id, &body);
1841 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1842 room_id: room_id.to_string(),
1843 sender_fingerprint,
1844 body,
1845 sent_at,
1846 });
1847 }
1848 Err(e) => {
1849 debug!(%e, "decrypt failed (probably missing session key)");
1850 }
1851 }
1852 }
1853 RoomMessage::Plain {
1854 sender_fingerprint,
1855 body,
1856 } => {
1857 if sender_fingerprint == our_fp {
1858 return;
1859 }
1860 let sent_at = now_unix();
1861 let _ = repo::insert_room_message(
1862 &self.db,
1863 room_id,
1864 &sender_fingerprint,
1865 "in",
1866 &body,
1867 sent_at,
1868 );
1869 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1870 self.maybe_emit_mention(room_id, &body);
1871 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1872 room_id: room_id.to_string(),
1873 sender_fingerprint,
1874 body,
1875 sent_at,
1876 });
1877 }
1878 RoomMessage::Typing { sender_fingerprint } => {
1879 if sender_fingerprint == our_fp {
1880 return;
1881 }
1882 let expiry = now_unix() + TYPING_TTL_SECS;
1883 let mut rooms = self.active_rooms.lock().unwrap();
1884 if let Some(room) = rooms.get_mut(room_id) {
1885 room.typers.insert(sender_fingerprint, expiry);
1886 }
1887 drop(rooms);
1888 let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1889 room_id: room_id.to_string(),
1890 });
1891 }
1892 RoomMessage::RotateRoomKey {
1893 rotator_fingerprint,
1894 new_salt,
1895 } => {
1896 if rotator_fingerprint == our_fp {
1897 return;
1898 }
1899 let signer = match verified_signer {
1904 Some(fp) => fp,
1905 None => {
1906 warn!(%room_id, "RotateRoomKey arrived unsigned; dropping");
1907 return;
1908 }
1909 };
1910 if signer != rotator_fingerprint {
1911 warn!(
1912 %signer, %rotator_fingerprint, %room_id,
1913 "RotateRoomKey signer mismatch with claimed rotator; dropping"
1914 );
1915 return;
1916 }
1917 let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1918 room_id: room_id.to_string(),
1919 rotator_fingerprint,
1920 new_salt,
1921 });
1922 }
1923 RoomMessage::MemberLeave { sender_fingerprint } => {
1924 if sender_fingerprint == our_fp {
1925 return;
1926 }
1927 let removed = {
1928 let mut rooms = self.active_rooms.lock().unwrap();
1929 if let Some(room) = rooms.get_mut(room_id) {
1930 room.members.remove(&sender_fingerprint)
1931 } else {
1932 false
1933 }
1934 };
1935 if removed {
1936 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1937 room_id: room_id.to_string(),
1938 fingerprint: sender_fingerprint,
1939 });
1940 }
1941 }
1942 RoomMessage::FileOffer {
1943 sender_fingerprint,
1944 file_id,
1945 name,
1946 size_bytes,
1947 mime,
1948 chunk_count,
1949 encrypted_meta,
1950 } => {
1951 if sender_fingerprint == our_fp {
1952 return; }
1954 self.handle_file_offer(
1955 room_id,
1956 sender_fingerprint,
1957 file_id,
1958 name,
1959 size_bytes,
1960 mime,
1961 chunk_count,
1962 encrypted_meta,
1963 );
1964 }
1965 RoomMessage::FileChunk {
1966 sender_fingerprint,
1967 file_id,
1968 chunk_index,
1969 total_chunks,
1970 data_b64,
1971 } => {
1972 if sender_fingerprint == our_fp {
1973 return;
1974 }
1975 self.handle_file_chunk(
1976 room_id,
1977 sender_fingerprint,
1978 file_id,
1979 chunk_index,
1980 total_chunks,
1981 data_b64,
1982 );
1983 }
1984 RoomMessage::OwnerGrant {
1985 room_id: announced_room_id,
1986 target_fingerprint,
1987 } => {
1988 if announced_room_id != room_id {
1993 warn!(payload_room = %announced_room_id, topic_room = %room_id, "OwnerGrant room mismatch");
1994 return;
1995 }
1996 let signer = match verified_signer {
1997 Some(fp) => fp,
1998 None => {
1999 warn!(%room_id, "OwnerGrant arrived unsigned; dropping");
2000 return;
2001 }
2002 };
2003 if !self.is_owner(room_id, &signer) {
2004 warn!(%signer, %room_id, "OwnerGrant signer isn't an owner; dropping");
2005 return;
2006 }
2007 info!(%signer, %target_fingerprint, %room_id, "OwnerGrant applied");
2008 if let Err(e) =
2009 repo::set_member_role(&self.db, room_id, &target_fingerprint, "owner")
2010 {
2011 warn!(%e, "OwnerGrant: set_member_role failed");
2012 }
2013 }
2014 RoomMessage::BanMember {
2015 room_id: announced_room_id,
2016 target_fingerprint,
2017 } => {
2018 if announced_room_id != room_id {
2019 warn!(payload_room = %announced_room_id, topic_room = %room_id, "BanMember room mismatch");
2020 return;
2021 }
2022 let signer = match verified_signer {
2023 Some(fp) => fp,
2024 None => {
2025 warn!(%room_id, "BanMember arrived unsigned; dropping");
2026 return;
2027 }
2028 };
2029 if !self.is_owner(room_id, &signer) {
2030 warn!(%signer, %room_id, "BanMember signer isn't an owner; dropping");
2031 return;
2032 }
2033 if target_fingerprint == our_fp {
2034 info!(%room_id, %signer, "we were kicked from this room");
2040 self.active_rooms.lock().unwrap().remove(room_id);
2041 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
2042 room_id: room_id.to_string(),
2043 });
2044 return;
2045 }
2046 info!(%signer, %target_fingerprint, %room_id, "BanMember applied");
2047 if let Err(e) = repo::add_room_ban(
2048 &self.db,
2049 room_id,
2050 &target_fingerprint,
2051 &signer,
2052 "", now_unix(),
2054 ) {
2055 warn!(%e, "BanMember: add_room_ban failed");
2056 }
2057 self.evict_banned_member(room_id, &target_fingerprint);
2058 }
2059 RoomMessage::SasInit {
2060 tx_id,
2061 ephemeral_x25519_pubkey_b64,
2062 target_fingerprint,
2063 } => {
2064 if target_fingerprint != our_fp {
2065 return;
2070 }
2071 let signer = match verified_signer {
2072 Some(fp) => fp,
2073 None => {
2074 warn!("SasInit arrived unsigned; dropping");
2075 return;
2076 }
2077 };
2078 let their_pub =
2079 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
2080 Ok(pk) => pk,
2081 Err(e) => {
2082 warn!(%e, "SasInit: bad x25519 pubkey");
2083 return;
2084 }
2085 };
2086 let tx_id_bytes = match B64.decode(&tx_id) {
2087 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
2088 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
2089 arr.copy_from_slice(&b);
2090 arr
2091 }
2092 _ => {
2093 warn!(%tx_id, "SasInit: bad tx_id length");
2094 return;
2095 }
2096 };
2097 let (_, our_secret, our_pub) = crate::crypto::sas::new_session();
2098 let sas_code =
2099 crate::crypto::sas::derive_sas_code(&our_secret, &their_pub, &tx_id_bytes);
2100 self.sas_flows.lock().unwrap().insert(
2101 tx_id.clone(),
2102 SasFlow {
2103 room_id: room_id.to_string(),
2104 partner_fingerprint: signer.clone(),
2105 our_secret,
2106 sas_code: Some(sas_code.clone()),
2107 our_confirmed: false,
2108 their_confirmed: false,
2109 },
2110 );
2111 let response = RoomMessage::SasResponse {
2114 tx_id: tx_id.clone(),
2115 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2116 };
2117 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
2118 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
2119 self.network
2120 .publish_room_message(room_id.to_string(), bytes)
2121 .await;
2122 }
2123 }
2124 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
2125 room_id: room_id.to_string(),
2126 partner_fingerprint: signer,
2127 tx_id,
2128 emoji_string: sas_code.emoji_string(),
2129 emoji_labels: sas_code.emoji_labels(),
2130 decimal: sas_code.decimal,
2131 });
2132 }
2133 RoomMessage::SasResponse {
2134 tx_id,
2135 ephemeral_x25519_pubkey_b64,
2136 } => {
2137 let signer = match verified_signer {
2138 Some(fp) => fp,
2139 None => {
2140 warn!("SasResponse arrived unsigned; dropping");
2141 return;
2142 }
2143 };
2144 let their_pub =
2145 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
2146 Ok(pk) => pk,
2147 Err(e) => {
2148 warn!(%e, "SasResponse: bad x25519 pubkey");
2149 return;
2150 }
2151 };
2152 let tx_id_bytes = match B64.decode(&tx_id) {
2153 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
2154 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
2155 arr.copy_from_slice(&b);
2156 arr
2157 }
2158 _ => return,
2159 };
2160 let emit = {
2161 let mut flows = self.sas_flows.lock().unwrap();
2162 let flow = match flows.get_mut(&tx_id) {
2163 Some(f) => f,
2164 None => {
2165 warn!(%tx_id, "SasResponse for unknown tx_id");
2166 return;
2167 }
2168 };
2169 if flow.partner_fingerprint != signer {
2170 warn!(
2171 expected = %flow.partner_fingerprint, got = %signer,
2172 "SasResponse signer doesn't match flow's partner; dropping"
2173 );
2174 return;
2175 }
2176 let code = crate::crypto::sas::derive_sas_code(
2177 &flow.our_secret,
2178 &their_pub,
2179 &tx_id_bytes,
2180 );
2181 flow.sas_code = Some(code.clone());
2182 code
2183 };
2184 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
2185 room_id: room_id.to_string(),
2186 partner_fingerprint: signer,
2187 tx_id,
2188 emoji_string: emit.emoji_string(),
2189 emoji_labels: emit.emoji_labels(),
2190 decimal: emit.decimal,
2191 });
2192 }
2193 RoomMessage::CodeJoinRequest {
2194 room_id: announced_room_id,
2195 joiner_x25519_pubkey_b64,
2196 code,
2197 } => {
2198 if announced_room_id != room_id {
2199 return;
2200 }
2201 let joiner_fp = match verified_signer {
2202 Some(fp) => fp,
2203 None => {
2204 warn!("CodeJoinRequest unsigned; dropping");
2205 return;
2206 }
2207 };
2208 let our_fp = self.identity.fingerprint().to_string();
2212 if !self.is_owner(room_id, &our_fp) {
2213 return;
2214 }
2215 let now = now_unix();
2217 let (code_ok, our_session_id, wrap_input) = {
2218 let mut rooms = self.active_rooms.lock().unwrap();
2219 let room = match rooms.get_mut(room_id) {
2220 Some(r) => r,
2221 None => return,
2222 };
2223 if room.passphrase_key.is_none() {
2224 warn!("CodeJoinRequest: no passphrase key locally; can't respond");
2225 return;
2226 }
2227 let original_len = room.issued_codes.len();
2228 room.issued_codes.retain(|(c, exp)| !(c == &code && *exp > now));
2229 let matched = room.issued_codes.len() < original_len;
2230 if !matched {
2231 info!(%joiner_fp, "CodeJoinRequest: code invalid or expired; ignoring");
2232 return;
2233 }
2234 let crypto = room.crypto.as_ref().unwrap();
2235 (
2236 true,
2237 crypto.our_session_id(),
2238 crypto.our_session_key_b64(),
2239 )
2240 };
2241 let _ = code_ok;
2242 let their_pub = match crate::crypto::sas::parse_pubkey(&joiner_x25519_pubkey_b64) {
2244 Ok(pk) => pk,
2245 Err(e) => {
2246 warn!(%e, "CodeJoinRequest: bad pubkey");
2247 return;
2248 }
2249 };
2250 use x25519_dalek::{PublicKey, StaticSecret};
2251 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2252 let our_pub = PublicKey::from(&our_secret);
2253 let shared = our_secret.diffie_hellman(&their_pub);
2254 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
2256 let mut wrap_key = [0u8; passphrase::KEY_LEN];
2257 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
2258 .expect("32 bytes is within HKDF limits");
2259 let wrapped = match passphrase::wrap(wrap_input.as_bytes(), &wrap_key) {
2262 Ok(w) => w,
2263 Err(e) => {
2264 warn!(%e, "CodeJoinRequest: wrap failed");
2265 return;
2266 }
2267 };
2268 let response = RoomMessage::CodeJoinResponse {
2269 room_id: room_id.to_string(),
2270 target_fingerprint: joiner_fp.clone(),
2271 owner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2272 owner_session_id: our_session_id,
2273 wrapped_session_key_b64: wrapped,
2274 nonce_b64: String::new(), };
2276 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
2277 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
2278 self.network
2279 .publish_room_message(room_id.to_string(), bytes)
2280 .await;
2281 }
2282 }
2283 info!(%joiner_fp, %room_id, "issued CodeJoinResponse");
2284 }
2285 RoomMessage::CodeJoinResponse {
2286 room_id: announced_room_id,
2287 target_fingerprint,
2288 owner_x25519_pubkey_b64,
2289 owner_session_id,
2290 wrapped_session_key_b64,
2291 nonce_b64: _,
2292 } => {
2293 if announced_room_id != room_id || target_fingerprint != our_fp {
2294 return;
2295 }
2296 let owner_fp = match verified_signer {
2297 Some(fp) => fp,
2298 None => {
2299 warn!("CodeJoinResponse unsigned; dropping");
2300 return;
2301 }
2302 };
2303 let our_secret = match self
2304 .pending_code_secrets
2305 .lock()
2306 .unwrap()
2307 .remove(&(room_id.to_string(), our_fp.clone()))
2308 {
2309 Some(s) => s,
2310 None => {
2311 warn!(%room_id, "CodeJoinResponse with no pending code-join state");
2312 return;
2313 }
2314 };
2315 let owner_pub = match crate::crypto::sas::parse_pubkey(&owner_x25519_pubkey_b64) {
2316 Ok(pk) => pk,
2317 Err(e) => {
2318 warn!(%e, "CodeJoinResponse: bad owner pubkey");
2319 return;
2320 }
2321 };
2322 let shared = our_secret.diffie_hellman(&owner_pub);
2323 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
2324 let mut wrap_key = [0u8; passphrase::KEY_LEN];
2325 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
2326 .expect("32 bytes within HKDF limits");
2327 let session_key_bytes =
2328 match passphrase::unwrap(&wrapped_session_key_b64, &wrap_key) {
2329 Ok(b) => b,
2330 Err(e) => {
2331 warn!(%e, "CodeJoinResponse: unwrap failed");
2332 return;
2333 }
2334 };
2335 let session_key_str = match String::from_utf8(session_key_bytes) {
2336 Ok(s) => s,
2337 Err(e) => {
2338 warn!(%e, "CodeJoinResponse: session key wasn't valid utf8");
2339 return;
2340 }
2341 };
2342 let mut rooms = self.active_rooms.lock().unwrap();
2344 if let Some(room) = rooms.get_mut(room_id) {
2345 if let Some(crypto) = room.crypto.as_mut() {
2346 if let Err(e) =
2347 crypto.add_inbound_session(&owner_fp, &session_key_str)
2348 {
2349 warn!(%e, "CodeJoinResponse: add_inbound_session failed");
2350 } else {
2351 info!(%room_id, %owner_fp, %owner_session_id, "code-join completed; can decrypt owner's messages");
2352 room.members.insert(owner_fp.clone());
2353 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
2354 room_id: room_id.to_string(),
2355 fingerprint: owner_fp,
2356 });
2357 }
2358 }
2359 }
2360 }
2361 RoomMessage::JoinRefused {
2362 room_id: announced_room_id,
2363 target_fingerprint,
2364 reason,
2365 } => {
2366 if announced_room_id != room_id || target_fingerprint != our_fp {
2367 return;
2368 }
2369 let _ = self.app_event_tx.send(AppEvent::Error {
2373 description: format!("join refused: {reason}"),
2374 });
2375 }
2376 RoomMessage::SasConfirm { tx_id, matched } => {
2377 let signer = match verified_signer {
2378 Some(fp) => fp,
2379 None => return,
2380 };
2381 let (room_id_done, partner_fp_done, both_done) = {
2382 let mut flows = self.sas_flows.lock().unwrap();
2383 let flow = match flows.get_mut(&tx_id) {
2384 Some(f) => f,
2385 None => return,
2386 };
2387 if flow.partner_fingerprint != signer {
2388 return;
2389 }
2390 if !matched {
2391 let _ = flow;
2393 flows.remove(&tx_id);
2394 return;
2395 }
2396 flow.their_confirmed = true;
2397 if flow.our_confirmed && flow.their_confirmed {
2398 (
2399 Some(flow.room_id.clone()),
2400 Some(flow.partner_fingerprint.clone()),
2401 true,
2402 )
2403 } else {
2404 (None, None, false)
2405 }
2406 };
2407 if both_done {
2408 if let (Some(rid), Some(pfp)) = (room_id_done, partner_fp_done) {
2409 if let Err(e) = self.finish_sas(&tx_id, &rid, &pfp).await {
2410 warn!(%e, "finish_sas failed");
2411 }
2412 }
2413 }
2414 }
2415 RoomMessage::ProfileUpdate {
2416 sender_fingerprint,
2417 username,
2418 updated_at,
2419 } => {
2420 let signer = match verified_signer {
2426 Some(fp) => fp,
2427 None => {
2428 warn!(
2429 sender = %sender_fingerprint,
2430 "dropping unsigned ProfileUpdate"
2431 );
2432 return;
2433 }
2434 };
2435 if signer != sender_fingerprint {
2436 warn!(
2437 signer = %signer,
2438 claimed = %sender_fingerprint,
2439 "dropping ProfileUpdate with signer != sender"
2440 );
2441 return;
2442 }
2443 if let Err(e) = repo::upsert_peer_profile(
2444 &self.db,
2445 &sender_fingerprint,
2446 username.as_deref(),
2447 updated_at,
2448 ) {
2449 warn!(%e, "upsert_peer_profile failed");
2450 return;
2451 }
2452 let _ = self.app_event_tx.send(AppEvent::PeerProfileUpdated {
2453 fingerprint: sender_fingerprint,
2454 username,
2455 });
2456 }
2457 }
2458 }
2459
2460 pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
2468 let bytes = std::fs::read(path)?;
2469 let name = path
2470 .file_name()
2471 .map(|n| n.to_string_lossy().to_string())
2472 .unwrap_or_else(|| "untitled".into());
2473 let mime = crate::files::guess_mime(&name);
2474 let original_path = path.to_path_buf();
2475
2476 let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
2477 let mut rooms = self.active_rooms.lock().unwrap();
2478 let room = rooms
2479 .get_mut(room_id)
2480 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2481 if room.info.encrypted {
2482 let crypto = room
2483 .crypto
2484 .as_mut()
2485 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
2486 let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
2487 (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
2488 } else {
2489 (false, None, None, bytes)
2490 }
2491 };
2492 let _ = &mut maybe_session_id; let plan =
2495 self.file_manager
2496 .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
2497 let file_id = plan.file_id.clone();
2498 let total = plan.chunks.len() as u32;
2499 let our_fp = self.identity.fingerprint().to_string();
2500
2501 let attachment = StoredAttachment {
2502 id: 0,
2503 room_id: room_id.to_string(),
2504 message_id: None,
2505 sender_fingerprint: our_fp.clone(),
2506 file_id: file_id.clone(),
2507 name: name.clone(),
2508 mime: mime.clone(),
2509 size_bytes: plan.size_bytes as i64,
2510 status: AttachmentStatus::Ready,
2511 cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
2512 saved_path: Some(original_path.to_string_lossy().into()),
2513 error: None,
2514 encrypted: room_encrypted,
2515 wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
2516 nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
2517 megolm_session_id: encrypted_meta_opt
2518 .as_ref()
2519 .map(|m| m.megolm_session_id.clone()),
2520 content_hash: encrypted_meta_opt.as_ref().map(|m| m.content_hash.clone()),
2521 created_at: now_unix(),
2522 };
2523 repo::upsert_attachment(&self.db, &attachment)?;
2524 let _ = self.app_event_tx.send(AppEvent::FileOffered {
2525 room_id: room_id.to_string(),
2526 file_id: file_id.clone(),
2527 name: name.clone(),
2528 size_bytes: plan.size_bytes,
2529 sender_fingerprint: our_fp.clone(),
2530 });
2531
2532 let offer = RoomMessage::FileOffer {
2534 sender_fingerprint: our_fp.clone(),
2535 file_id: file_id.clone(),
2536 name,
2537 size_bytes: plan.size_bytes,
2538 mime,
2539 chunk_count: total,
2540 encrypted_meta: encrypted_meta_opt,
2541 };
2542 if let Ok(bytes) = encode_wire(&offer) {
2543 self.network
2544 .publish_room_message(room_id.to_string(), bytes)
2545 .await;
2546 }
2547
2548 let net = self.network.clone();
2551 let room = room_id.to_string();
2552 let our = our_fp.clone();
2553 let fid = file_id.clone();
2554 let chunks = plan.chunks.clone();
2555 tokio::spawn(async move {
2556 for (i, data) in chunks.iter().enumerate() {
2557 let msg = RoomMessage::FileChunk {
2558 sender_fingerprint: our.clone(),
2559 file_id: fid.clone(),
2560 chunk_index: i as u32,
2561 total_chunks: total,
2562 data_b64: B64.encode(data),
2563 };
2564 if let Ok(bytes) = encode_wire(&msg) {
2565 net.publish_room_message(room.clone(), bytes).await;
2566 }
2567 tokio::time::sleep(Duration::from_millis(40)).await;
2568 }
2569 });
2570
2571 Ok(file_id)
2572 }
2573
2574 pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
2577 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2578 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2579 if !matches!(
2580 attachment.status,
2581 AttachmentStatus::Ready | AttachmentStatus::Saved
2582 ) {
2583 return Err(HuddleError::Other(format!(
2584 "attachment is not ready (status={})",
2585 attachment.status.as_str()
2586 )));
2587 }
2588 let plaintext = if attachment.encrypted
2593 && attachment.sender_fingerprint == self.identity.fingerprint()
2594 {
2595 match attachment
2596 .saved_path
2597 .as_deref()
2598 .filter(|p| Path::new(p).exists())
2599 {
2600 Some(src) => std::fs::read(src)?,
2601 None => {
2602 return Err(HuddleError::Other(
2603 "your original file has moved or been deleted — it can't be \
2604 recovered from the encrypted cache"
2605 .into(),
2606 ));
2607 }
2608 }
2609 } else {
2610 let cached = self.file_manager.read_cache(file_id)?;
2611 if attachment.encrypted {
2612 let meta = EncryptedFileMeta {
2613 megolm_session_id: attachment
2614 .megolm_session_id
2615 .clone()
2616 .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
2617 wrapped_key_b64: attachment
2618 .wrapped_key
2619 .clone()
2620 .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
2621 nonce_b64: attachment
2622 .nonce
2623 .clone()
2624 .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
2625 content_hash: attachment
2626 .content_hash
2627 .clone()
2628 .ok_or_else(|| HuddleError::Other("missing content_hash".into()))?,
2629 };
2630 self.decrypt_attachment(
2631 room_id,
2632 &attachment.sender_fingerprint,
2633 &cached,
2634 &meta,
2635 )?
2636 } else {
2637 cached
2638 }
2639 };
2640 let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
2641 repo::update_attachment_paths(
2642 &self.db,
2643 room_id,
2644 file_id,
2645 None,
2646 Some(&saved.to_string_lossy()),
2647 )?;
2648 repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
2649 let _ = self.app_event_tx.send(AppEvent::FileSaved {
2650 file_id: file_id.into(),
2651 path: saved.to_string_lossy().into(),
2652 });
2653 Ok(saved)
2654 }
2655
2656 pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
2658 self.file_manager.cancel_incoming(file_id);
2659 repo::update_attachment_status(
2660 &self.db,
2661 room_id,
2662 file_id,
2663 AttachmentStatus::Cancelled,
2664 None,
2665 )?;
2666 Ok(())
2667 }
2668
2669 pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
2671 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2672 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2673 let path = attachment
2674 .saved_path
2675 .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
2676 open_with_system(&path)
2677 }
2678
2679 pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
2680 repo::list_room_attachments(&self.db, room_id)
2681 }
2682
2683 pub fn set_member_verified(
2687 &self,
2688 room_id: &str,
2689 fingerprint: &str,
2690 verified: bool,
2691 ) -> Result<()> {
2692 let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
2697 if !members.iter().any(|m| m.fingerprint == fingerprint) {
2698 repo::upsert_room_member(
2699 &self.db,
2700 &StoredRoomMember {
2701 room_id: room_id.to_string(),
2702 peer_id: String::new(),
2703 fingerprint: fingerprint.to_string(),
2704 last_seen: Some(now_unix()),
2705 verified,
2706 ed25519_pubkey: None,
2707 role: "member".into(),
2708 },
2709 )?;
2710 }
2711 repo::set_member_verified(&self.db, room_id, fingerprint, verified)
2712 }
2713
2714 pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
2715 repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
2716 }
2717
2718 pub fn is_owner(&self, room_id: &str, fingerprint: &str) -> bool {
2721 repo::list_room_owners(&self.db, room_id)
2722 .unwrap_or_default()
2723 .iter()
2724 .any(|fp| fp == fingerprint)
2725 }
2726
2727 pub fn we_are_owner(&self, room_id: &str) -> bool {
2728 self.is_owner(room_id, &self.identity.fingerprint().to_string())
2729 }
2730
2731 pub fn room_owners(&self, room_id: &str) -> Vec<String> {
2734 repo::list_room_owners(&self.db, room_id).unwrap_or_default()
2735 }
2736
2737 pub fn verified_only_inbound(&self) -> bool {
2740 repo::get_setting(&self.db, "verified_only_inbound")
2741 .unwrap_or(None)
2742 .map(|v| v == "1")
2743 .unwrap_or(false)
2744 }
2745
2746 pub fn set_verified_only_inbound(&self, on: bool) -> Result<()> {
2747 repo::set_setting(&self.db, "verified_only_inbound", if on { "1" } else { "0" })
2748 }
2749
2750 pub fn room_verified_only(&self, room_id: &str) -> bool {
2755 repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
2756 }
2757
2758 pub fn set_room_verified_only(&self, room_id: &str, on: bool) -> Result<()> {
2759 repo::set_room_verified_only(&self.db, room_id, on)
2760 }
2761
2762 pub fn onboarding_seen(&self) -> bool {
2764 repo::is_onboarding_seen(&self.db).unwrap_or(true)
2765 }
2766
2767 pub fn mark_onboarding_seen(&self) -> Result<()> {
2768 repo::mark_onboarding_seen(&self.db)
2769 }
2770
2771 pub async fn grant_owner(&self, room_id: &str, target_fingerprint: &str) -> Result<()> {
2775 let our_fp = self.identity.fingerprint().to_string();
2776 if !self.is_owner(room_id, &our_fp) {
2777 return Err(HuddleError::Other(
2778 "only an owner can grant owner".into(),
2779 ));
2780 }
2781 let msg = RoomMessage::OwnerGrant {
2782 room_id: room_id.to_string(),
2783 target_fingerprint: target_fingerprint.to_string(),
2784 };
2785 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2786 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2787 self.network
2788 .publish_room_message(room_id.to_string(), bytes)
2789 .await;
2790 repo::set_member_role(&self.db, room_id, target_fingerprint, "owner")?;
2792 Ok(())
2793 }
2794
2795 pub async fn kick_member(
2806 &self,
2807 room_id: &str,
2808 target_fingerprint: &str,
2809 ) -> Result<String> {
2810 let our_fp = self.identity.fingerprint().to_string();
2811 if !self.is_owner(room_id, &our_fp) {
2812 return Err(HuddleError::Other("only an owner can kick".into()));
2813 }
2814 if target_fingerprint == our_fp {
2815 return Err(HuddleError::Other("can't kick yourself".into()));
2816 }
2817 let info = self
2818 .active_rooms
2819 .lock()
2820 .unwrap()
2821 .get(room_id)
2822 .map(|r| r.info.clone())
2823 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2824 if !info.encrypted {
2825 let msg = RoomMessage::BanMember {
2829 room_id: room_id.to_string(),
2830 target_fingerprint: target_fingerprint.to_string(),
2831 };
2832 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2833 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2834 self.network
2835 .publish_room_message(room_id.to_string(), bytes)
2836 .await;
2837 repo::add_room_ban(
2838 &self.db,
2839 room_id,
2840 target_fingerprint,
2841 &our_fp,
2842 &env.signature_b64,
2843 now_unix(),
2844 )?;
2845 self.evict_banned_member(room_id, target_fingerprint);
2846 return Ok(String::new());
2847 }
2848 let new_passphrase = generate_join_passphrase();
2850 let msg = RoomMessage::BanMember {
2851 room_id: room_id.to_string(),
2852 target_fingerprint: target_fingerprint.to_string(),
2853 };
2854 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2855 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2856 self.network
2857 .publish_room_message(room_id.to_string(), bytes)
2858 .await;
2859 repo::add_room_ban(
2860 &self.db,
2861 room_id,
2862 target_fingerprint,
2863 &our_fp,
2864 &env.signature_b64,
2865 now_unix(),
2866 )?;
2867 self.evict_banned_member(room_id, target_fingerprint);
2868 self.rotate_room(room_id, &new_passphrase).await?;
2871 Ok(new_passphrase)
2872 }
2873
2874 pub fn generate_join_code(&self, room_id: &str) -> Result<String> {
2881 let our_fp = self.identity.fingerprint().to_string();
2882 if !self.is_owner(room_id, &our_fp) {
2883 return Err(HuddleError::Other(
2884 "only an owner can issue join codes".into(),
2885 ));
2886 }
2887 let code = generate_alphanumeric_code(8);
2888 let expires_at = now_unix() + 10 * 60;
2889 let mut rooms = self.active_rooms.lock().unwrap();
2890 let room = rooms
2891 .get_mut(room_id)
2892 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2893 let now = now_unix();
2895 room.issued_codes.retain(|(_, exp)| *exp > now);
2896 room.issued_codes.push((code.clone(), expires_at));
2897 Ok(code)
2898 }
2899
2900 pub async fn join_room_with_code(
2907 &self,
2908 room_id: &str,
2909 code: &str,
2910 ) -> Result<()> {
2911 let info = {
2913 let d = self.discovered_rooms.lock().unwrap().get(room_id).cloned();
2914 match d {
2915 Some(d) => StoredRoom {
2916 id: room_id.to_string(),
2917 name: d.name,
2918 creator_fingerprint: d.creator_fingerprint,
2919 encrypted: d.encrypted,
2920 passphrase_salt: None, created_at: now_unix(),
2922 last_active: Some(now_unix()),
2923 },
2924 None => {
2925 return Err(HuddleError::Other(format!(
2926 "room {room_id} not visible — wait for an announcement"
2927 )))
2928 }
2929 }
2930 };
2931 if !info.encrypted {
2932 return Err(HuddleError::Other(
2933 "code-join only applies to encrypted rooms".into(),
2934 ));
2935 }
2936 let our_fp = self.identity.fingerprint().to_string();
2937 use x25519_dalek::{PublicKey, StaticSecret};
2940 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2941 let our_pub = PublicKey::from(&our_secret);
2942 let key = (room_id.to_string(), our_fp.clone());
2947 self.pending_code_secrets
2948 .lock()
2949 .unwrap()
2950 .insert(key.clone(), our_secret);
2951 let map = self.pending_code_secrets.clone();
2956 let tx = self.app_event_tx.clone();
2957 let timeout_room = room_id.to_string();
2958 tokio::spawn(async move {
2959 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
2960 let still_pending = map.lock().unwrap().remove(&key).is_some();
2961 if still_pending {
2962 let _ = tx.send(AppEvent::CodeJoinTimedOut {
2963 room_id: timeout_room,
2964 reason: "no response from owner — code may be wrong or expired".into(),
2965 });
2966 }
2967 });
2968 repo::insert_room(&self.db, &info)?;
2975 self.active_rooms.lock().unwrap().insert(
2978 room_id.to_string(),
2979 ActiveRoom {
2980 info: info.clone(),
2981 crypto: Some(RoomCrypto::new_for_room(
2982 self.db.clone(),
2983 room_id.to_string(),
2984 our_fp.clone(),
2985 self.session_persist_key,
2986 )?),
2987 passphrase_key: None,
2988 members: {
2989 let mut s = HashSet::new();
2990 s.insert(our_fp.clone());
2991 s
2992 },
2993 typers: HashMap::new(),
2994 read_only: true,
2995 issued_codes: Vec::new(),
2996 },
2997 );
2998 self.network.subscribe_room(room_id.to_string()).await;
2999 let req = RoomMessage::CodeJoinRequest {
3001 room_id: room_id.to_string(),
3002 joiner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
3003 code: code.to_string(),
3004 };
3005 let env = crate::crypto::sign_message(&self.identity, &req)?;
3006 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3007 self.network
3008 .publish_room_message(room_id.to_string(), bytes)
3009 .await;
3010 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
3013 room_id: room_id.to_string(),
3014 });
3015 Ok(())
3016 }
3017
3018 pub async fn sas_start(&self, room_id: &str, target_fingerprint: &str) -> Result<String> {
3024 let (tx_id_bytes, our_secret, our_pub) = crate::crypto::sas::new_session();
3025 let tx_id = B64.encode(tx_id_bytes);
3026 let msg = RoomMessage::SasInit {
3027 tx_id: tx_id.clone(),
3028 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
3029 target_fingerprint: target_fingerprint.to_string(),
3030 };
3031 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3032 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3033 self.sas_flows.lock().unwrap().insert(
3034 tx_id.clone(),
3035 SasFlow {
3036 room_id: room_id.to_string(),
3037 partner_fingerprint: target_fingerprint.to_string(),
3038 our_secret,
3039 sas_code: None,
3040 our_confirmed: false,
3041 their_confirmed: false,
3042 },
3043 );
3044 self.network
3045 .publish_room_message(room_id.to_string(), bytes)
3046 .await;
3047 Ok(tx_id)
3048 }
3049
3050 pub async fn sas_match(&self, tx_id: &str) -> Result<()> {
3054 let (room_id, partner_fp, both_done) = {
3055 let mut flows = self.sas_flows.lock().unwrap();
3056 let flow = flows
3057 .get_mut(tx_id)
3058 .ok_or_else(|| HuddleError::Other("unknown SAS tx_id".into()))?;
3059 flow.our_confirmed = true;
3060 (
3061 flow.room_id.clone(),
3062 flow.partner_fingerprint.clone(),
3063 flow.our_confirmed && flow.their_confirmed,
3064 )
3065 };
3066 let msg = RoomMessage::SasConfirm {
3067 tx_id: tx_id.to_string(),
3068 matched: true,
3069 };
3070 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3071 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3072 self.network
3073 .publish_room_message(room_id.clone(), bytes)
3074 .await;
3075 if both_done {
3076 self.finish_sas(tx_id, &room_id, &partner_fp).await?;
3077 }
3078 Ok(())
3079 }
3080
3081 pub fn sas_cancel(&self, tx_id: &str) {
3085 self.sas_flows.lock().unwrap().remove(tx_id);
3086 }
3087
3088 async fn finish_sas(
3091 &self,
3092 tx_id: &str,
3093 room_id: &str,
3094 partner_fingerprint: &str,
3095 ) -> Result<()> {
3096 repo::set_member_verified(&self.db, room_id, partner_fingerprint, true)?;
3097 repo::add_verified_peer(&self.db, partner_fingerprint, now_unix())?;
3098 self.sas_flows.lock().unwrap().remove(tx_id);
3099 let _ = self.app_event_tx.send(AppEvent::SasVerified {
3100 room_id: room_id.to_string(),
3101 partner_fingerprint: partner_fingerprint.to_string(),
3102 });
3103 Ok(())
3104 }
3105
3106 fn evict_banned_member(&self, room_id: &str, fingerprint: &str) {
3111 if let Some(room) = self.active_rooms.lock().unwrap().get_mut(room_id) {
3112 room.members.remove(fingerprint);
3113 }
3114 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
3115 room_id: room_id.to_string(),
3116 fingerprint: fingerprint.to_string(),
3117 });
3118 }
3119
3120 pub fn display_name(&self) -> Option<String> {
3121 repo::get_display_name(&self.db).unwrap_or(None)
3122 }
3123
3124 pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
3125 repo::set_display_name(&self.db, name)
3126 }
3127
3128 pub async fn set_username(&self, name: Option<&str>) -> Result<()> {
3134 repo::set_display_name(&self.db, name)?;
3135 let msg = RoomMessage::ProfileUpdate {
3136 sender_fingerprint: self.identity.fingerprint().to_string(),
3137 username: name.map(|s| s.to_string()),
3138 updated_at: now_unix_ms(),
3139 };
3140 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3141 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3142 let rooms: Vec<String> = self.active_rooms.lock().unwrap().keys().cloned().collect();
3143 for room_id in rooms {
3144 self.network
3145 .publish_room_message(room_id, bytes.clone())
3146 .await;
3147 }
3148 Ok(())
3149 }
3150
3151 pub fn lookup_username(&self, fingerprint: &str) -> Option<String> {
3156 repo::get_peer_username(&self.db, fingerprint).unwrap_or(None)
3157 }
3158
3159 pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
3163 self.lookup_username(fingerprint)
3164 }
3165
3166 pub fn is_room_muted(&self, room_id: &str) -> bool {
3167 repo::is_room_muted(&self.db, room_id).unwrap_or(false)
3168 }
3169
3170 pub fn list_room_bans(&self, room_id: &str) -> Vec<String> {
3175 repo::list_room_bans(&self.db, room_id).unwrap_or_default()
3176 }
3177
3178 pub fn list_blocked_peers(&self) -> Vec<String> {
3182 repo::list_blocked_peers(&self.db).unwrap_or_default()
3183 }
3184
3185 pub fn unblock_peer(&self, fingerprint: &str) -> Result<()> {
3189 repo::unblock_peer(&self.db, fingerprint)
3190 }
3191
3192 pub fn is_room_read_only(&self, room_id: &str) -> bool {
3198 self.active_rooms
3199 .lock()
3200 .unwrap()
3201 .get(room_id)
3202 .map(|r| r.read_only)
3203 .unwrap_or(false)
3204 }
3205
3206 pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
3207 repo::set_room_muted(&self.db, room_id, muted)
3208 }
3209
3210 pub async fn broadcast_typing(&self, room_id: &str) {
3213 if !self.active_rooms.lock().unwrap().contains_key(room_id) {
3214 return;
3215 }
3216 let msg = RoomMessage::Typing {
3217 sender_fingerprint: self.identity.fingerprint().to_string(),
3218 };
3219 if let Ok(bytes) = encode_wire(&msg) {
3220 self.network
3221 .publish_room_message(room_id.to_string(), bytes)
3222 .await;
3223 }
3224 }
3225
3226 pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
3229 let now = now_unix();
3230 let mut rooms = self.active_rooms.lock().unwrap();
3231 let room = match rooms.get_mut(room_id) {
3232 Some(r) => r,
3233 None => return Vec::new(),
3234 };
3235 room.typers.retain(|_, exp| *exp > now);
3236 let mut v: Vec<String> = room.typers.keys().cloned().collect();
3237 v.sort();
3238 v
3239 }
3240
3241 pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
3251 if new_passphrase.is_empty() {
3252 return Err(HuddleError::Other("new passphrase is empty".into()));
3253 }
3254 let new_salt = passphrase::random_salt();
3255 let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
3256
3257 let info = {
3258 let mut rooms = self.active_rooms.lock().unwrap();
3259 let room = rooms
3260 .get_mut(room_id)
3261 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3262 if !room.info.encrypted {
3263 return Err(HuddleError::Other(
3264 "rotation only applies to encrypted rooms".into(),
3265 ));
3266 }
3267 let new_crypto = RoomCrypto::new_for_room(
3269 self.db.clone(),
3270 room_id.to_string(),
3271 self.identity.fingerprint().to_string(),
3272 self.session_persist_key,
3273 )?;
3274 room.crypto = Some(new_crypto);
3275 room.passphrase_key = Some(new_key);
3276 room.info.passphrase_salt = Some(new_salt.to_vec());
3277 room.info.clone()
3278 };
3279
3280 let rot = RoomMessage::RotateRoomKey {
3286 rotator_fingerprint: self.identity.fingerprint().to_string(),
3287 new_salt: new_salt.to_vec(),
3288 };
3289 if let Ok(env) = crate::crypto::sign_message(&self.identity, &rot) {
3293 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
3294 self.network
3295 .publish_room_message(room_id.to_string(), bytes)
3296 .await;
3297 }
3298 }
3299 if let Err(e) = self.broadcast_member_announce(room_id).await {
3301 warn!(%e, "rotate: broadcast announce failed");
3302 }
3303
3304 repo::insert_room(&self.db, &info)?;
3306 Ok(())
3307 }
3308
3309 pub async fn accept_rotation(
3313 &self,
3314 room_id: &str,
3315 new_salt: &[u8],
3316 new_passphrase: &str,
3317 ) -> Result<()> {
3318 let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
3319 let info = {
3320 let mut rooms = self.active_rooms.lock().unwrap();
3321 let room = rooms
3322 .get_mut(room_id)
3323 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3324 room.passphrase_key = Some(new_key);
3325 room.info.passphrase_salt = Some(new_salt.to_vec());
3326 room.info.clone()
3327 };
3328 let req = RoomMessage::SessionKeyRequest {
3332 requester_fingerprint: self.identity.fingerprint().to_string(),
3333 };
3334 if let Ok(bytes) = encode_wire(&req) {
3335 self.network
3336 .publish_room_message(room_id.to_string(), bytes)
3337 .await;
3338 }
3339 repo::insert_room(&self.db, &info)?;
3340 Ok(())
3341 }
3342
3343 #[allow(clippy::too_many_arguments)]
3348 fn handle_file_offer(
3349 &self,
3350 room_id: &str,
3351 sender_fingerprint: String,
3352 file_id: String,
3353 name: String,
3354 size_bytes: u64,
3355 mime: Option<String>,
3356 _chunk_count: u32,
3357 encrypted_meta: Option<EncryptedFileMeta>,
3358 ) {
3359 let encrypted = encrypted_meta.is_some();
3360 let attachment = StoredAttachment {
3361 id: 0,
3362 room_id: room_id.to_string(),
3363 message_id: None,
3364 sender_fingerprint: sender_fingerprint.clone(),
3365 file_id: file_id.clone(),
3366 name: name.clone(),
3367 mime,
3368 size_bytes: size_bytes as i64,
3369 status: AttachmentStatus::Offered,
3370 cache_path: None,
3371 saved_path: None,
3372 error: None,
3373 encrypted,
3374 wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
3375 nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
3376 megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
3377 content_hash: encrypted_meta.as_ref().map(|m| m.content_hash.clone()),
3378 created_at: now_unix(),
3379 };
3380 if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
3381 warn!(%e, "upsert attachment");
3382 return;
3383 }
3384 self.file_manager.set_expected_size(&file_id, size_bytes);
3387 let _ = self.app_event_tx.send(AppEvent::FileOffered {
3388 room_id: room_id.to_string(),
3389 file_id,
3390 name,
3391 size_bytes,
3392 sender_fingerprint,
3393 });
3394 }
3395
3396 fn handle_file_chunk(
3397 &self,
3398 room_id: &str,
3399 _sender_fingerprint: String,
3400 file_id: String,
3401 chunk_index: u32,
3402 total_chunks: u32,
3403 data_b64: String,
3404 ) {
3405 let data = match B64.decode(&data_b64) {
3406 Ok(d) => d,
3407 Err(e) => {
3408 warn!(%e, "bad chunk base64");
3409 return;
3410 }
3411 };
3412 let expected_size = match repo::get_attachment(&self.db, room_id, &file_id) {
3416 Ok(Some(a)) => {
3417 if matches!(
3418 a.status,
3419 AttachmentStatus::Cancelled | AttachmentStatus::Failed
3420 ) {
3421 return;
3422 }
3423 a.size_bytes as u64
3424 }
3425 Ok(None) => crate::files::MAX_FILE_SIZE,
3426 Err(e) => {
3427 warn!(%e, "get attachment for chunk");
3428 crate::files::MAX_FILE_SIZE
3429 }
3430 };
3431
3432 let result = self.file_manager.accept_chunk(
3433 &file_id,
3434 chunk_index,
3435 total_chunks,
3436 data,
3437 expected_size,
3438 );
3439 match result {
3440 Ok(None) => {
3441 let _ = repo::update_attachment_status(
3443 &self.db,
3444 room_id,
3445 &file_id,
3446 AttachmentStatus::Downloading,
3447 None,
3448 );
3449 let bytes_so_far = self
3452 .file_manager
3453 .progress(&file_id)
3454 .map(|(b, _)| b)
3455 .unwrap_or(0);
3456 let _ = self.app_event_tx.send(AppEvent::FileProgress {
3457 file_id: file_id.clone(),
3458 bytes_received: bytes_so_far,
3459 total_bytes: expected_size,
3460 });
3461 }
3462 Ok(Some(completed)) => {
3463 let _ = repo::update_attachment_paths(
3464 &self.db,
3465 room_id,
3466 &file_id,
3467 Some(&completed.cache_path.to_string_lossy()),
3468 None,
3469 );
3470 let _ = repo::update_attachment_status(
3471 &self.db,
3472 room_id,
3473 &file_id,
3474 AttachmentStatus::Ready,
3475 None,
3476 );
3477 let _ = self.app_event_tx.send(AppEvent::FileReady {
3478 file_id: file_id.clone(),
3479 });
3480 }
3481 Err(e) => {
3482 let msg = e.to_string();
3483 warn!(%msg, "chunk processing failed");
3484 let _ = repo::update_attachment_status(
3485 &self.db,
3486 room_id,
3487 &file_id,
3488 AttachmentStatus::Failed,
3489 Some(&msg),
3490 );
3491 let _ = self.app_event_tx.send(AppEvent::FileFailed {
3492 file_id: file_id.clone(),
3493 reason: msg,
3494 });
3495 }
3496 }
3497 }
3498
3499 fn maybe_emit_mention(&self, room_id: &str, body: &str) {
3502 let full = self.identity.fingerprint().to_lowercase();
3503 let short: &str = full.split('-').next().unwrap_or(&full);
3505 let lower = body.to_lowercase();
3506 let hit = lower.contains(full.as_str())
3510 || lower
3511 .split(|c: char| !c.is_ascii_hexdigit())
3512 .any(|tok| tok == short);
3513 if hit {
3514 let _ = self.app_event_tx.send(AppEvent::MentionReceived {
3515 room_id: room_id.to_string(),
3516 body: body.to_string(),
3517 });
3518 }
3519 }
3520
3521 fn decrypt_attachment(
3522 &self,
3523 room_id: &str,
3524 sender_fingerprint: &str,
3525 ciphertext: &[u8],
3526 meta: &EncryptedFileMeta,
3527 ) -> Result<Vec<u8>> {
3528 let mut rooms = self.active_rooms.lock().unwrap();
3529 let room = rooms
3530 .get_mut(room_id)
3531 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
3532 let crypto = room
3533 .crypto
3534 .as_mut()
3535 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
3536 file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
3537 }
3538
3539 pub async fn go_dark(&self, master_passphrase: &str) -> Result<()> {
3551 let no_master = self.session_persist_key == [0u8; 32];
3552 if !no_master {
3553 let salt = storage::keychain::load_or_create_salt()?;
3554 let candidate_master =
3555 storage::keychain::derive_master_key(master_passphrase, &salt)?;
3556 let candidate_subkey =
3557 storage::keychain::derive_subkey(&candidate_master, b"megolm-persist");
3558 if !ct_eq_32(&candidate_subkey, &self.session_persist_key) {
3559 return Err(HuddleError::Other(
3560 "incorrect master passphrase".into(),
3561 ));
3562 }
3563 }
3564
3565 let room_ids: Vec<String> = self
3566 .active_rooms
3567 .lock()
3568 .unwrap()
3569 .keys()
3570 .cloned()
3571 .collect();
3572 let _ = tokio::time::timeout(Duration::from_secs(2), async {
3573 for room_id in &room_ids {
3574 if let Err(e) = self.leave_room(room_id).await {
3575 warn!(%room_id, %e, "go_dark: leave_room failed");
3576 }
3577 }
3578 })
3579 .await;
3580
3581 self.network.shutdown().await;
3582 tokio::time::sleep(Duration::from_millis(300)).await;
3583
3584 let data_dir = config::data_dir();
3585 let candidates = [
3586 "huddle.db",
3587 "huddle.db-shm",
3588 "huddle.db-wal",
3589 "keychain.salt",
3590 "huddle.log",
3591 "config.toml",
3592 ];
3593 for name in &candidates {
3594 let path = data_dir.join(name);
3595 wipe_file(&path);
3596 }
3597 if let Ok(read) = std::fs::read_dir(&data_dir) {
3598 for entry in read.flatten() {
3599 if let Some(name) = entry.file_name().to_str() {
3600 if name.starts_with("huddle.log.") {
3601 wipe_file(&entry.path());
3602 }
3603 }
3604 }
3605 }
3606 let files_dir = data_dir.join("files");
3610 if let Ok(read) = std::fs::read_dir(&files_dir) {
3611 for entry in read.flatten() {
3612 let path = entry.path();
3613 if path.is_file() {
3614 wipe_file(&path);
3615 } else if path.is_dir() {
3616 if let Ok(inner) = std::fs::read_dir(&path) {
3619 for inner_entry in inner.flatten() {
3620 if inner_entry.path().is_file() {
3621 wipe_file(&inner_entry.path());
3622 }
3623 }
3624 }
3625 let _ = std::fs::remove_dir(&path);
3626 }
3627 }
3628 }
3629 let _ = std::fs::remove_dir(&files_dir);
3630 let _ = std::fs::remove_dir(&data_dir);
3631
3632 let _ = self.app_event_tx.send(AppEvent::WentDark);
3633 Ok(())
3634 }
3635}
3636
3637fn normalize_to_fingerprint(input: &str) -> Option<String> {
3644 let s = input
3645 .trim()
3646 .trim_start_matches("HD-")
3647 .trim_start_matches("hd-")
3648 .to_string();
3649 let hex_only: String = s.chars().filter(|c| *c != '-').collect();
3650 if hex_only.len() != 24 || !hex_only.chars().all(|c| c.is_ascii_hexdigit()) {
3651 return None;
3652 }
3653 let lower = hex_only.to_ascii_lowercase();
3654 let chunks: Vec<String> = lower
3655 .as_bytes()
3656 .chunks(4)
3657 .map(|c| std::str::from_utf8(c).unwrap().to_string())
3658 .collect();
3659 Some(chunks.join("-"))
3660}
3661
3662fn address_preference(addr: &str) -> u8 {
3668 if addr.contains("/p2p-circuit") {
3669 return 9; }
3671 if let Some(rest) = addr.strip_prefix("/ip4/") {
3672 if let Some(ip_str) = rest.split('/').next() {
3673 if let Ok(ip) = ip_str.parse::<std::net::Ipv4Addr>() {
3674 if ip.is_loopback() {
3675 return 1; }
3677 if is_rfc1918(&ip) || ip.is_link_local() {
3678 return 0; }
3680 return 3; }
3682 }
3683 return 3;
3684 }
3685 if addr.starts_with("/ip6/") {
3686 return 4;
3687 }
3688 if addr.starts_with("/dns4/") || addr.starts_with("/dns6/") || addr.starts_with("/dnsaddr/") {
3689 return 5;
3690 }
3691 7
3692}
3693
3694fn is_rfc1918(ip: &std::net::Ipv4Addr) -> bool {
3698 let octets = ip.octets();
3699 octets[0] == 10
3700 || (octets[0] == 172 && (16..=31).contains(&octets[1]))
3701 || (octets[0] == 192 && octets[1] == 168)
3702}
3703
3704fn short_fp_for_msg(fingerprint: &str) -> String {
3708 let head: String = fingerprint
3709 .chars()
3710 .filter(|c| *c != '-')
3711 .take(4)
3712 .collect::<String>()
3713 .to_ascii_uppercase();
3714 format!("HD-{}…", head)
3715}
3716
3717fn ct_eq_32(a: &[u8; 32], b: &[u8; 32]) -> bool {
3721 let mut diff = 0u8;
3722 for i in 0..32 {
3723 diff |= a[i] ^ b[i];
3724 }
3725 diff == 0
3726}
3727
3728fn wipe_file(path: &Path) {
3732 use std::io::Write;
3733 if let Ok(meta) = std::fs::metadata(path) {
3734 if let Ok(mut f) = std::fs::OpenOptions::new().write(true).open(path) {
3735 let zeros = vec![0u8; meta.len() as usize];
3736 let _ = f.write_all(&zeros);
3737 let _ = f.sync_all();
3738 }
3739 }
3740 if let Err(e) = std::fs::remove_file(path) {
3741 if e.kind() != std::io::ErrorKind::NotFound {
3742 warn!(?path, %e, "wipe_file: remove failed");
3743 }
3744 }
3745}
3746
3747fn open_with_system(path: &str) -> Result<()> {
3749 #[cfg(target_os = "macos")]
3750 let cmd = "open";
3751 #[cfg(target_os = "linux")]
3752 let cmd = "xdg-open";
3753 #[cfg(target_os = "windows")]
3754 let cmd = "cmd";
3755 #[cfg(target_os = "windows")]
3756 let args = vec!["/C", "start", "", path];
3757 #[cfg(not(target_os = "windows"))]
3758 let args = vec![path];
3759
3760 std::process::Command::new(cmd)
3761 .args(args)
3762 .spawn()
3763 .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
3764 Ok(())
3765}
3766
3767static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
3770 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
3771
3772pub fn salt_len() -> usize {
3777 SALT_LEN
3778}
3779
3780fn now_unix() -> i64 {
3781 SystemTime::now()
3782 .duration_since(UNIX_EPOCH)
3783 .unwrap()
3784 .as_secs() as i64
3785}
3786
3787fn now_unix_ms() -> i64 {
3788 SystemTime::now()
3789 .duration_since(UNIX_EPOCH)
3790 .unwrap()
3791 .as_millis() as i64
3792}
3793
3794fn generate_join_passphrase() -> String {
3800 use rand::RngCore;
3801 let mut bytes = [0u8; 16];
3802 rand::thread_rng().fill_bytes(&mut bytes);
3803 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
3806}
3807
3808fn generate_alphanumeric_code(len: usize) -> String {
3813 use rand::Rng;
3814 const ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
3815 let mut rng = rand::thread_rng();
3816 let mut out = String::with_capacity(len + 1);
3817 for i in 0..len {
3818 if i == 4 && len == 8 {
3819 out.push('-'); }
3821 let idx = rng.gen_range(0..ALPHABET.len());
3822 out.push(ALPHABET[idx] as char);
3823 }
3824 out
3825}
3826
3827#[cfg(test)]
3828mod parser_tests {
3829 use super::parse_dial_address;
3830
3831 #[test]
3832 fn parses_ipv4_port() {
3833 let m = parse_dial_address("10.3.72.53:9027").unwrap();
3834 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
3835 }
3836
3837 #[test]
3838 fn parses_bracketed_ipv6() {
3839 let m = parse_dial_address("[::1]:9027").unwrap();
3840 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
3841 }
3842
3843 #[test]
3844 fn rejects_unbracketed_ipv6() {
3845 let err = parse_dial_address("fe80::1:9027").unwrap_err();
3846 assert!(err.to_string().contains("brackets"));
3847 }
3848
3849 #[test]
3850 fn passes_through_raw_multiaddr() {
3851 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
3852 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
3853 }
3854
3855 #[test]
3856 fn empty_address_is_error() {
3857 assert!(parse_dial_address(" ").is_err());
3858 }
3859
3860 #[test]
3861 fn rejects_bad_port() {
3862 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
3863 }
3864}
3865
3866#[cfg(test)]
3867mod transport_preference_tests {
3868 use super::{address_preference, normalize_to_fingerprint};
3869
3870 #[test]
3871 fn lan_beats_public_beats_circuit() {
3872 let lan = address_preference("/ip4/192.168.1.5/tcp/9027");
3873 let pub_v4 = address_preference("/ip4/8.8.8.8/tcp/9027");
3874 let circuit = address_preference(
3875 "/ip4/1.2.3.4/tcp/4001/p2p/12D3Koo/p2p-circuit/p2p/12D3KooXYZ",
3876 );
3877 assert!(lan < pub_v4, "LAN {} should beat public {}", lan, pub_v4);
3878 assert!(
3879 pub_v4 < circuit,
3880 "public {} should beat circuit {}",
3881 pub_v4,
3882 circuit
3883 );
3884 }
3885
3886 #[test]
3887 fn all_rfc1918_ranges_are_lan() {
3888 assert_eq!(
3889 address_preference("/ip4/10.0.0.1/tcp/9027"),
3890 address_preference("/ip4/192.168.0.1/tcp/9027"),
3891 );
3892 assert_eq!(
3893 address_preference("/ip4/172.16.0.1/tcp/9027"),
3894 address_preference("/ip4/192.168.0.1/tcp/9027"),
3895 );
3896 assert!(
3898 address_preference("/ip4/172.32.0.1/tcp/9027")
3899 > address_preference("/ip4/172.16.0.1/tcp/9027")
3900 );
3901 }
3902
3903 #[test]
3904 fn normalize_id_accepts_branded_and_raw() {
3905 let canon = "aaaa-bbbb-cccc-dddd-eeee-ffff";
3906 assert_eq!(
3907 normalize_to_fingerprint("HD-AAAA-BBBB-CCCC-DDDD-EEEE-FFFF").as_deref(),
3908 Some(canon)
3909 );
3910 assert_eq!(
3911 normalize_to_fingerprint("aaaabbbbccccddddeeeeffff").as_deref(),
3912 Some(canon)
3913 );
3914 assert_eq!(normalize_to_fingerprint(canon).as_deref(), Some(canon));
3915 assert!(normalize_to_fingerprint("alice").is_none());
3916 assert!(normalize_to_fingerprint("HD-ZZZZ").is_none());
3917 }
3918}