1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{encode_wire, RoomAnnouncement, RoomMessage, WireMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25 self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26 StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36 pub address: String,
37 pub label: Option<String>,
38 pub last_connected_at: Option<i64>,
39 pub connected_peer_id: Option<PeerId>,
40}
41
42pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45 let trimmed = input.trim();
46 if trimmed.is_empty() {
47 return Err(HuddleError::Other("address is empty".into()));
48 }
49 if trimmed.starts_with('/') {
50 return trimmed
51 .parse::<Multiaddr>()
52 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53 }
54 if let Some(rest) = trimmed.strip_prefix('[') {
55 let (host, port) = rest
56 .split_once("]:")
57 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58 let port: u16 = port
59 .parse()
60 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61 return format!("/ip6/{}/tcp/{}", host, port)
62 .parse::<Multiaddr>()
63 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64 }
65 let (host, port) = trimmed
66 .rsplit_once(':')
67 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68 if host.contains(':') {
69 return Err(HuddleError::Other(format!(
70 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71 )));
72 }
73 let port: u16 = port
74 .parse()
75 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76 format!("/ip4/{}/tcp/{}", host, port)
77 .parse::<Multiaddr>()
78 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81struct ActiveRoom {
83 info: StoredRoom,
84 crypto: Option<RoomCrypto>,
85 passphrase_key: Option<[u8; KEY_LEN]>,
88 members: HashSet<String>,
90 typers: HashMap<String, i64>,
93 read_only: bool,
100 issued_codes: Vec<(String, i64)>,
104}
105
106const TYPING_TTL_SECS: i64 = 3;
107
108const DISCOVERED_TTL_SECS: i64 = 45;
111const ANNOUNCE_INTERVAL_SECS: u64 = 15;
112
113struct SasFlow {
117 room_id: String,
118 partner_fingerprint: String,
119 our_secret: x25519_dalek::StaticSecret,
120 sas_code: Option<crate::crypto::sas::SasCode>,
122 our_confirmed: bool,
123 their_confirmed: bool,
124}
125
126#[derive(Clone)]
127pub struct AppHandle {
128 identity: Arc<Identity>,
129 network: NetworkHandle,
130 mode: NetworkMode,
131 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
132 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
133 restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
137 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
140 file_manager: Arc<FileManager>,
142 db: Db,
143 session_persist_key: [u8; 32],
147 sas_flows: Arc<Mutex<HashMap<String, SasFlow>>>,
150 pending_code_secrets:
157 Arc<Mutex<HashMap<(String, String), x25519_dalek::StaticSecret>>>,
158 pending_invite_dials: Arc<Mutex<HashMap<String, String>>>,
171 nat_reachable_addrs: Arc<Mutex<HashSet<String>>>,
177 relay_circuit_addrs: Arc<Mutex<HashSet<String>>>,
183 host_addr_dial_attempts: Arc<Mutex<HashMap<String, i64>>>,
188 last_profile_broadcast_at_ms: Arc<Mutex<HashMap<String, i64>>>,
195 app_event_tx: broadcast::Sender<AppEvent>,
196}
197
198const HOST_ADDR_DIAL_BACKOFF_SECS: i64 = 300;
201
202const PROFILE_REBROADCAST_FLOOR_MS: i64 = 60_000;
206
207impl AppHandle {
208 pub async fn start() -> Result<Self> {
209 Self::start_with_options(NetworkMode::Mdns, 0, None, Vec::new()).await
210 }
211
212 pub async fn start_with_options(
213 mode: NetworkMode,
214 port: u16,
215 master_key: Option<&[u8; 32]>,
216 relays: Vec<Multiaddr>,
217 ) -> Result<Self> {
218 config::ensure_data_dir()?;
219 let session_persist_key = match master_key {
224 Some(mk) => storage::keychain::derive_subkey(mk, b"megolm-persist"),
225 None => [0u8; 32],
226 };
227 let db = storage::open_db(&config::db_path(), master_key)?;
228 Self::start_with_db_and_options(db, mode, port, session_persist_key, relays).await
229 }
230
231 pub async fn start_with_db(db: Db) -> Result<Self> {
232 Self::start_with_db_and_options(db, NetworkMode::Mdns, 0, [0u8; 32], Vec::new()).await
233 }
234
235 pub async fn start_with_db_and_options(
236 db: Db,
237 mode: NetworkMode,
238 port: u16,
239 session_persist_key: [u8; 32],
240 relays: Vec<Multiaddr>,
241 ) -> Result<Self> {
242 let identity = Self::load_or_create_identity(&db)?;
243 let identity = Arc::new(identity);
244 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, relay_count = relays.len(), "identity loaded");
245
246 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
247 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
248 let network =
249 network::start_network_with(&identity, net_event_tx, mode, port, relays)?;
250
251 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
252 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
253 let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
254 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
255 let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
256
257 let handle = Self {
258 identity,
259 network,
260 mode,
261 active_rooms,
262 discovered_rooms,
263 restorable_rooms,
264 connected_dial_addrs,
265 file_manager,
266 db,
267 session_persist_key,
268 sas_flows: Arc::new(Mutex::new(HashMap::new())),
269 pending_code_secrets: Arc::new(Mutex::new(HashMap::new())),
270 pending_invite_dials: Arc::new(Mutex::new(HashMap::new())),
271 nat_reachable_addrs: Arc::new(Mutex::new(HashSet::new())),
272 relay_circuit_addrs: Arc::new(Mutex::new(HashSet::new())),
273 host_addr_dial_attempts: Arc::new(Mutex::new(HashMap::new())),
274 last_profile_broadcast_at_ms: Arc::new(Mutex::new(HashMap::new())),
275 app_event_tx,
276 };
277
278 handle.spawn_event_processor(net_event_rx);
279 handle.spawn_announcement_ticker();
280 handle.spawn_discovered_room_pruner();
281 handle.spawn_known_peer_reconnector();
282 handle.restore_rooms_from_db().await;
283
284 Ok(handle)
285 }
286
287 pub fn mode(&self) -> NetworkMode {
288 self.mode
289 }
290
291 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
292 self.app_event_tx.subscribe()
293 }
294
295 pub fn fingerprint(&self) -> &str {
296 self.identity.fingerprint()
297 }
298
299 pub fn peer_id(&self) -> PeerId {
300 self.identity.peer_id()
301 }
302
303 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
304 let now = now_unix();
305 let mut by_id: HashMap<String, DiscoveredRoom> = self
306 .discovered_rooms
307 .lock()
308 .unwrap()
309 .clone();
310
311 for room in self.active_rooms.lock().unwrap().values() {
315 let entry = DiscoveredRoom {
316 room_id: room.info.id.clone(),
317 name: room.info.name.clone(),
318 encrypted: room.info.encrypted,
319 member_count: room.members.len() as u32,
320 creator_fingerprint: room.info.creator_fingerprint.clone(),
321 last_seen: now,
322 restorable: false,
323 };
324 by_id
325 .entry(room.info.id.clone())
326 .and_modify(|d| {
327 d.last_seen = now;
328 if entry.member_count > d.member_count {
329 d.member_count = entry.member_count;
330 }
331 d.restorable = false;
332 })
333 .or_insert(entry);
334 }
335
336 for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
340 if by_id.contains_key(id) {
341 continue;
342 }
343 by_id.insert(
344 id.clone(),
345 DiscoveredRoom {
346 room_id: id.clone(),
347 name: stored.name.clone(),
348 encrypted: stored.encrypted,
349 member_count: 0,
350 creator_fingerprint: stored.creator_fingerprint.clone(),
351 last_seen: stored.last_active.unwrap_or(stored.created_at),
352 restorable: true,
353 },
354 );
355 }
356
357 let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
358 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
359 v
360 }
361
362 pub fn active_room_ids(&self) -> Vec<String> {
363 self.active_rooms.lock().unwrap().keys().cloned().collect()
364 }
365
366 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
367 self.active_rooms
368 .lock()
369 .unwrap()
370 .get(room_id)
371 .map(|r| r.info.clone())
372 }
373
374 pub fn room_members(&self, room_id: &str) -> Vec<String> {
375 self.active_rooms
376 .lock()
377 .unwrap()
378 .get(room_id)
379 .map(|r| {
380 let mut m: Vec<String> = r.members.iter().cloned().collect();
381 m.sort();
382 m
383 })
384 .unwrap_or_default()
385 }
386
387 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
388 repo::get_room_messages(&self.db, room_id, limit)
389 }
390
391 pub fn search_room_messages(
392 &self,
393 room_id: &str,
394 query: &str,
395 limit: i64,
396 ) -> Result<Vec<repo::StoredRoomMessage>> {
397 repo::search_room_messages(&self.db, room_id, query, limit)
398 }
399
400 pub async fn start_room(
402 &self,
403 name: &str,
404 encrypted: bool,
405 passphrase: Option<&str>,
406 ) -> Result<String> {
407 if encrypted && passphrase.is_none() {
408 return Err(HuddleError::Other(
409 "encrypted room requires a passphrase".into(),
410 ));
411 }
412
413 let created_at = now_unix();
414 let creator_fp = self.identity.fingerprint().to_string();
415 let room_id = derive_room_id(&creator_fp, name, created_at);
416
417 let (passphrase_salt, passphrase_key) = if encrypted {
418 let salt = passphrase::random_salt();
419 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
420 (Some(salt.to_vec()), Some(key))
421 } else {
422 (None, None)
423 };
424
425 let info = StoredRoom {
426 id: room_id.clone(),
427 name: name.to_string(),
428 creator_fingerprint: creator_fp.clone(),
429 encrypted,
430 passphrase_salt: passphrase_salt.clone(),
431 created_at,
432 last_active: Some(created_at),
433 };
434 repo::insert_room(&self.db, &info)?;
435
436 let crypto = if encrypted {
437 Some(RoomCrypto::new_for_room(
438 self.db.clone(),
439 room_id.clone(),
440 creator_fp.clone(),
441 self.session_persist_key,
442 )?)
443 } else {
444 None
445 };
446
447 let mut members = HashSet::new();
448 members.insert(creator_fp.clone());
449
450 repo::upsert_room_member(
454 &self.db,
455 &StoredRoomMember {
456 room_id: room_id.clone(),
457 peer_id: String::new(),
458 fingerprint: creator_fp.clone(),
459 last_seen: Some(created_at),
460 verified: true, ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
462 role: "owner".into(),
463 },
464 )?;
465
466 self.active_rooms.lock().unwrap().insert(
467 room_id.clone(),
468 ActiveRoom {
469 info: info.clone(),
470 crypto,
471 passphrase_key,
472 members,
473 typers: HashMap::new(),
474 read_only: false,
475 issued_codes: Vec::new(),
476 },
477 );
478
479 self.network.subscribe_room(room_id.clone()).await;
480 self.announce_room_now(&info, 1).await;
481
482 let app = self.clone();
485 let rid = room_id.clone();
486 tokio::spawn(async move {
487 tokio::time::sleep(Duration::from_millis(500)).await;
488 if let Err(e) = app.broadcast_member_announce(&rid).await {
489 warn!(%e, "broadcast member announce");
490 }
491 });
492
493 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
494 room_id: room_id.clone(),
495 });
496
497 Ok(room_id)
498 }
499
500 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
504 let (name, creator_fingerprint, encrypted, salt_opt) = {
506 if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
507 let salt = self.get_room_salt(room_id);
508 (d.name, d.creator_fingerprint, d.encrypted, salt)
509 } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
510 {
511 (
512 stored.name,
513 stored.creator_fingerprint,
514 stored.encrypted,
515 stored.passphrase_salt,
516 )
517 } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
518 (
519 stored.name,
520 stored.creator_fingerprint,
521 stored.encrypted,
522 stored.passphrase_salt,
523 )
524 } else {
525 return Err(HuddleError::Other(format!("room {room_id} not found")));
526 }
527 };
528
529 if encrypted && passphrase.is_none() {
530 return Err(HuddleError::Other(
531 "encrypted room requires a passphrase".into(),
532 ));
533 }
534
535 let passphrase_key = if encrypted {
536 let salt = salt_opt
537 .clone()
538 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
539 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
540 } else {
541 None
542 };
543
544 let info = StoredRoom {
545 id: room_id.to_string(),
546 name,
547 creator_fingerprint,
548 encrypted,
549 passphrase_salt: salt_opt.clone(),
550 created_at: now_unix(),
551 last_active: Some(now_unix()),
552 };
553 repo::insert_room(&self.db, &info)?;
554
555 let crypto = if encrypted {
556 let our_fp = self.identity.fingerprint().to_string();
559 let existing = RoomCrypto::load(
560 self.db.clone(),
561 room_id.to_string(),
562 our_fp.clone(),
563 self.session_persist_key,
564 )?;
565 Some(match existing {
566 Some(c) => c,
567 None => RoomCrypto::new_for_room(
568 self.db.clone(),
569 room_id.to_string(),
570 our_fp,
571 self.session_persist_key,
572 )?,
573 })
574 } else {
575 None
576 };
577
578 let mut members = HashSet::new();
579 members.insert(self.identity.fingerprint().to_string());
580
581 self.active_rooms.lock().unwrap().insert(
582 room_id.to_string(),
583 ActiveRoom {
584 info: info.clone(),
585 crypto,
586 passphrase_key,
587 members,
588 typers: HashMap::new(),
589 read_only: false,
590 issued_codes: Vec::new(),
591 },
592 );
593 self.restorable_rooms.lock().unwrap().remove(room_id);
595
596 self.network.subscribe_room(room_id.to_string()).await;
597
598 let app = self.clone();
599 let rid = room_id.to_string();
600 tokio::spawn(async move {
601 tokio::time::sleep(Duration::from_millis(500)).await;
602 if let Err(e) = app.broadcast_member_announce(&rid).await {
603 warn!(%e, "broadcast member announce");
604 }
605 let req = RoomMessage::SessionKeyRequest {
607 requester_fingerprint: app.identity.fingerprint().to_string(),
608 };
609 if let Ok(bytes) = encode_wire(&req) {
610 app.network.publish_room_message(rid.clone(), bytes).await;
611 }
612 });
613
614 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
615 room_id: room_id.to_string(),
616 });
617
618 Ok(())
619 }
620
621 async fn restore_rooms_from_db(&self) {
626 let rooms = match repo::list_rooms(&self.db) {
627 Ok(v) => v,
628 Err(e) => {
629 warn!(%e, "list rooms on restore");
630 return;
631 }
632 };
633 let our_fp = self.identity.fingerprint().to_string();
634 let count = rooms.len();
635 for info in rooms {
636 if info.encrypted {
637 self.restorable_rooms
638 .lock()
639 .unwrap()
640 .insert(info.id.clone(), info);
641 continue;
642 }
643 let mut members = HashSet::new();
644 members.insert(our_fp.clone());
645 if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
646 for m in stored_members {
647 members.insert(m.fingerprint);
648 }
649 }
650 let member_count = members.len() as u32;
651 self.active_rooms.lock().unwrap().insert(
652 info.id.clone(),
653 ActiveRoom {
654 info: info.clone(),
655 crypto: None,
656 passphrase_key: None,
657 members,
658 typers: HashMap::new(),
659 read_only: false,
660 issued_codes: Vec::new(),
661 },
662 );
663 self.network.subscribe_room(info.id.clone()).await;
664 self.announce_room_now(&info, member_count).await;
665 info!(room_id = %info.id, name = %info.name, "restored room");
666 }
667 if count > 0 {
668 debug!(count, "restored rooms from db");
669 }
670 }
671
672 pub async fn leave_room(&self, room_id: &str) -> Result<bool> {
677 let leave_msg = RoomMessage::MemberLeave {
679 sender_fingerprint: self.identity.fingerprint().to_string(),
680 };
681 let dispatched = match encode_wire(&leave_msg) {
682 Ok(bytes) => {
683 self.network
684 .publish_room_message(room_id.to_string(), bytes)
685 .await;
686 true
687 }
688 Err(e) => {
689 warn!(%e, %room_id, "failed to encode MemberLeave notice");
690 false
691 }
692 };
693
694 self.active_rooms.lock().unwrap().remove(room_id);
695 self.network.unsubscribe_room(room_id.to_string()).await;
696
697 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
698 room_id: room_id.to_string(),
699 });
700 Ok(dispatched)
701 }
702
703 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
704 let our_fp = self.identity.fingerprint().to_string();
705 let msg = {
706 let mut rooms = self.active_rooms.lock().unwrap();
707 let room = rooms
708 .get_mut(room_id)
709 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
710
711 if room.read_only {
712 return Err(HuddleError::Other(
713 "this room is read-only — you joined via code without the passphrase. Ask an owner for the passphrase or wait for a key rotation that includes you.".into(),
714 ));
715 }
716
717 if room.info.encrypted {
718 let crypto = room
719 .crypto
720 .as_mut()
721 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
722 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
723 RoomMessage::Encrypted {
724 sender_fingerprint: our_fp.clone(),
725 session_id,
726 ciphertext_b64: base64::Engine::encode(
727 &base64::engine::general_purpose::STANDARD,
728 &ct_bytes,
729 ),
730 }
731 } else {
732 RoomMessage::Plain {
733 sender_fingerprint: our_fp.clone(),
734 body: body.to_string(),
735 }
736 }
737 };
738
739 let bytes = encode_wire(&msg)?;
740 self.network
741 .publish_room_message(room_id.to_string(), bytes)
742 .await;
743
744 let now = now_unix();
745 let msg_id =
746 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
747 repo::update_room_last_active(&self.db, room_id, now)?;
748
749 let _ = self.app_event_tx.send(AppEvent::MessageSent {
750 room_id: room_id.to_string(),
751 body: body.to_string(),
752 message_id: msg_id,
753 });
754
755 Ok(())
756 }
757
758 pub async fn shutdown(&self) {
759 self.network.shutdown().await;
760 }
761
762 pub async fn dial(&self, input: &str) -> Result<()> {
771 let multiaddr = parse_dial_address(input)?;
772 let canonical = multiaddr.to_string();
773 info!(%canonical, "dialing");
774
775 repo::upsert_known_peer(
776 &self.db,
777 &KnownPeer {
778 address: canonical.clone(),
779 label: None,
780 last_connected_at: None,
781 last_attempt_at: Some(now_unix()),
782 created_at: now_unix(),
783 fingerprint: None,
787 trusted: false,
788 },
789 )?;
790
791 let _ = self.app_event_tx.send(AppEvent::Dialing {
792 address: canonical.clone(),
793 });
794 self.network.dial(multiaddr).await;
795 Ok(())
796 }
797
798 pub fn nat_reachable_addrs(&self) -> Vec<String> {
803 self.nat_reachable_addrs
804 .lock()
805 .unwrap()
806 .iter()
807 .cloned()
808 .collect()
809 }
810
811 pub fn dialable_addrs(&self) -> Vec<String> {
819 let mut out: Vec<String> = self
820 .relay_circuit_addrs
821 .lock()
822 .unwrap()
823 .iter()
824 .cloned()
825 .collect();
826 for a in self.nat_reachable_addrs.lock().unwrap().iter() {
827 if !out.contains(a) {
828 out.push(a.clone());
829 }
830 }
831 out.truncate(4);
832 out
833 }
834
835 pub async fn dial_invite(&self, address: &str, claimed_fp: &str) -> Result<()> {
848 let multiaddr = parse_dial_address(address)?;
849 let canonical = multiaddr.to_string();
850 self.pending_invite_dials
851 .lock()
852 .unwrap()
853 .insert(canonical.clone(), claimed_fp.to_string());
854 self.dial(address).await
857 }
858
859 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
860 let connected = self.connected_dial_addrs.lock().unwrap().clone();
861 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
862 stored
863 .into_iter()
864 .map(|p| {
865 let connected_peer = connected.get(&p.address).copied();
866 KnownPeerStatus {
867 address: p.address,
868 label: p.label,
869 last_connected_at: p.last_connected_at,
870 connected_peer_id: connected_peer,
871 }
872 })
873 .collect()
874 }
875
876 pub async fn forget_peer(&self, address: &str) -> Result<()> {
877 repo::forget_known_peer(&self.db, address)?;
878 self.connected_dial_addrs.lock().unwrap().remove(address);
879 Ok(())
880 }
881
882 pub async fn redial(&self, address: &str) -> Result<()> {
884 self.dial(address).await
885 }
886
887 pub async fn accept_inbound(&self, peer_id: PeerId, address: &str) {
892 self.network.accept_inbound(peer_id).await;
893 self.connected_dial_addrs
894 .lock()
895 .unwrap()
896 .insert(address.to_string(), peer_id);
897 }
898
899 pub async fn reject_inbound(&self, peer_id: PeerId, fingerprint: &str) -> Result<()> {
904 self.network.reject_inbound(peer_id).await;
905 repo::block_peer(&self.db, fingerprint, now_unix())?;
906 Ok(())
907 }
908
909 pub async fn trust_inbound(
912 &self,
913 peer_id: PeerId,
914 fingerprint: &str,
915 address: &str,
916 ) -> Result<()> {
917 self.network.accept_inbound(peer_id).await;
918 self.connected_dial_addrs
919 .lock()
920 .unwrap()
921 .insert(address.to_string(), peer_id);
922 repo::upsert_known_peer(
926 &self.db,
927 &KnownPeer {
928 address: address.to_string(),
929 label: None,
930 last_connected_at: Some(now_unix()),
931 last_attempt_at: Some(now_unix()),
932 created_at: now_unix(),
933 fingerprint: Some(fingerprint.to_string()),
934 trusted: true,
935 },
936 )?;
937 Ok(())
938 }
939
940 fn spawn_known_peer_reconnector(&self) {
941 let handle = self.clone();
942 tokio::spawn(async move {
943 tokio::time::sleep(Duration::from_millis(500)).await;
945 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
946 for (i, peer) in known.into_iter().enumerate() {
950 let handle = handle.clone();
951 tokio::spawn(async move {
952 let jitter = (peer.address.len() as u64 * 37) % 200;
955 tokio::time::sleep(Duration::from_millis(150 * i as u64 + jitter)).await;
956 if let Err(e) = handle.dial(&peer.address).await {
957 debug!(%e, addr = %peer.address, "auto-reconnect failed");
958 }
959 });
960 }
961 });
962 }
963
964 fn load_or_create_identity(db: &Db) -> Result<Identity> {
969 if let Some(stored) = repo::load_identity(db)? {
970 let mut bytes = [0u8; 32];
971 bytes.copy_from_slice(&stored.ed25519_secret);
972 Identity::from_secret_bytes(bytes)
973 } else {
974 let id = Identity::generate()?;
975 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
976 Ok(id)
977 }
978 }
979
980 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
981 self.active_rooms
982 .lock()
983 .unwrap()
984 .get(room_id)
985 .and_then(|r| r.info.passphrase_salt.clone())
986 .or_else(|| {
987 ROOM_SALT_CACHE
989 .lock()
990 .unwrap()
991 .get(room_id)
992 .cloned()
993 })
994 }
995
996 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
997 let owner_fingerprints =
998 repo::list_room_owners(&self.db, &info.id).unwrap_or_default();
999 let verified_only = repo::get_room_verified_only(&self.db, &info.id).unwrap_or(false);
1000 let host_addrs = self.dialable_addrs();
1001 let ann = RoomAnnouncement {
1002 room_id: info.id.clone(),
1003 name: info.name.clone(),
1004 encrypted: info.encrypted,
1005 passphrase_salt: info.passphrase_salt.clone(),
1006 member_count,
1007 creator_fingerprint: info.creator_fingerprint.clone(),
1008 announced_at: now_unix(),
1009 owner_fingerprints,
1010 verified_only,
1011 host_addrs,
1012 };
1013 self.network.announce_room(ann).await;
1014 }
1015
1016 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
1017 let our_fp = self.identity.fingerprint().to_string();
1018 let wrapped = {
1019 let mut rooms = self.active_rooms.lock().unwrap();
1020 let room = rooms
1021 .get_mut(room_id)
1022 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
1023 if room.info.encrypted {
1024 let crypto = room.crypto.as_mut().unwrap();
1025 let session_key = crypto.our_session_key_b64();
1026 let passphrase_key = room
1027 .passphrase_key
1028 .as_ref()
1029 .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
1030 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
1031 } else {
1032 None
1033 }
1034 };
1035 let display_name = repo::get_display_name(&self.db).unwrap_or(None);
1036 let msg = RoomMessage::MemberAnnounce {
1037 sender_fingerprint: our_fp,
1038 wrapped_session_key: wrapped,
1039 display_name,
1040 sender_ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
1041 };
1042 let bytes = encode_wire(&msg)?;
1043 self.network
1044 .publish_room_message(room_id.to_string(), bytes)
1045 .await;
1046 Ok(())
1047 }
1048
1049 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
1050 let handle = self.clone();
1051 tokio::spawn(async move {
1052 while let Some(event) = net_rx.recv().await {
1053 handle.process_network_event(event).await;
1054 }
1055 info!("event processor stopped");
1056 });
1057 }
1058
1059 fn spawn_announcement_ticker(&self) {
1060 let handle = self.clone();
1061 tokio::spawn(async move {
1062 let mut interval =
1063 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
1064 interval.tick().await; loop {
1066 interval.tick().await;
1067 let snapshot: Vec<(StoredRoom, u32)> = {
1068 let active = handle.active_rooms.lock().unwrap();
1069 active
1070 .values()
1071 .map(|r| (r.info.clone(), r.members.len() as u32))
1072 .collect()
1073 };
1074 for (info, member_count) in snapshot {
1075 handle.announce_room_now(&info, member_count).await;
1076 }
1077 }
1078 });
1079 }
1080
1081 fn spawn_discovered_room_pruner(&self) {
1082 let handle = self.clone();
1083 tokio::spawn(async move {
1084 let mut interval = tokio::time::interval(Duration::from_secs(10));
1085 interval.tick().await;
1086 loop {
1087 interval.tick().await;
1088 let now = now_unix();
1089 let mut to_drop = Vec::new();
1090 {
1091 let mut map = handle.discovered_rooms.lock().unwrap();
1092 map.retain(|id, r| {
1093 if now - r.last_seen > DISCOVERED_TTL_SECS {
1094 to_drop.push(id.clone());
1095 false
1096 } else {
1097 true
1098 }
1099 });
1100 }
1101 for id in to_drop {
1102 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
1103 }
1104 }
1105 });
1106 }
1107
1108 async fn process_network_event(&self, event: NetworkEvent) {
1109 match event {
1110 NetworkEvent::PeerDiscovered { peer_id } => {
1111 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
1112 }
1113 NetworkEvent::PeerExpired { peer_id } => {
1114 self.connected_dial_addrs
1120 .lock()
1121 .unwrap()
1122 .retain(|_addr, pid| *pid != peer_id);
1123 let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
1124 }
1125 NetworkEvent::ListeningOn { address } => {
1126 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1127 address: address.to_string(),
1128 });
1129 }
1130 NetworkEvent::RoomAnnouncementReceived(ann) => {
1131 if let Some(salt) = &ann.passphrase_salt {
1133 ROOM_SALT_CACHE
1134 .lock()
1135 .unwrap()
1136 .insert(ann.room_id.clone(), salt.clone());
1137 }
1138 let our_fp_for_dial = self.identity.fingerprint().to_string();
1143 if ann.creator_fingerprint != our_fp_for_dial && !ann.host_addrs.is_empty() {
1144 let now = now_unix();
1145 let should_dial = {
1146 let mut attempts = self.host_addr_dial_attempts.lock().unwrap();
1147 match attempts.get(&ann.creator_fingerprint).copied() {
1148 Some(last) if now - last < HOST_ADDR_DIAL_BACKOFF_SECS => false,
1149 _ => {
1150 attempts.insert(ann.creator_fingerprint.clone(), now);
1151 true
1152 }
1153 }
1154 };
1155 if should_dial {
1156 if let Some(first) = ann.host_addrs.first() {
1157 info!(
1158 announcer = %ann.creator_fingerprint,
1159 addr = %first,
1160 "opportunistic dial via room announcement host_addrs"
1161 );
1162 let _ = self.dial(first).await;
1165 }
1166 }
1167 }
1168 let discovered = DiscoveredRoom {
1169 room_id: ann.room_id.clone(),
1170 name: ann.name.clone(),
1171 encrypted: ann.encrypted,
1172 member_count: ann.member_count,
1173 creator_fingerprint: ann.creator_fingerprint.clone(),
1174 last_seen: now_unix(),
1175 restorable: false,
1176 };
1177 if self.active_rooms.lock().unwrap().contains_key(&ann.room_id) {
1182 self.discovered_rooms
1183 .lock()
1184 .unwrap()
1185 .insert(ann.room_id.clone(), discovered);
1186 return;
1187 }
1188 self.discovered_rooms
1189 .lock()
1190 .unwrap()
1191 .insert(ann.room_id.clone(), discovered.clone());
1192 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
1193 }
1194 NetworkEvent::RoomMessageReceived {
1195 room_id,
1196 payload,
1197 from_peer: _,
1198 } => {
1199 let wire: WireMessage = match serde_json::from_slice(&payload) {
1206 Ok(w) => w,
1207 Err(e) => {
1208 warn!(%e, "bad wire envelope");
1209 return;
1210 }
1211 };
1212 let (msg, verified_signer) = match wire {
1213 WireMessage::Plain(m) => (m, None),
1214 WireMessage::Signed(env) => {
1215 let claimed_pubkey = env.ed25519_pubkey_b64.clone();
1216 match crate::crypto::verify_signed(&env) {
1217 Ok((m, fp)) => {
1218 match repo::get_member_ed25519_pubkey(
1225 &self.db, &room_id, &fp,
1226 ) {
1227 Ok(Some(known)) if known != claimed_pubkey => {
1228 warn!(
1229 %fp, %room_id,
1230 "pubkey mismatch vs stored; dropping signed message"
1231 );
1232 return;
1233 }
1234 _ => {}
1235 }
1236 (m, Some(fp))
1237 }
1238 Err(e) => {
1239 warn!(%e, fp = %env.fingerprint, "signed envelope verify failed");
1240 return;
1241 }
1242 }
1243 }
1244 };
1245 self.handle_room_message(&room_id, msg, verified_signer).await;
1246 }
1247 NetworkEvent::DialSucceeded { peer_id, address } => {
1248 let addr_s = address.to_string();
1249 self.connected_dial_addrs
1250 .lock()
1251 .unwrap()
1252 .insert(addr_s.clone(), peer_id);
1253 let _ = repo::upsert_known_peer(
1257 &self.db,
1258 &KnownPeer {
1259 address: addr_s.clone(),
1260 label: None,
1261 last_connected_at: Some(now_unix()),
1262 last_attempt_at: Some(now_unix()),
1263 created_at: now_unix(),
1264 fingerprint: None,
1265 trusted: false,
1266 },
1267 );
1268 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
1269 address: addr_s,
1270 peer_id,
1271 });
1272 }
1273 NetworkEvent::DialFailed { address, error } => {
1274 let addr_s = address.to_string();
1275 let _ = self.app_event_tx.send(AppEvent::DialFailed {
1276 address: addr_s,
1277 error,
1278 });
1279 }
1280 NetworkEvent::PeerIdentified { peer_id, fingerprint } => {
1281 let matched_addrs: Vec<String> = {
1287 let map = self.connected_dial_addrs.lock().unwrap();
1288 map.iter()
1289 .filter_map(|(addr, pid)| {
1290 if *pid == peer_id {
1291 Some(addr.clone())
1292 } else {
1293 None
1294 }
1295 })
1296 .collect()
1297 };
1298 let mismatch = {
1308 let mut map = self.pending_invite_dials.lock().unwrap();
1309 let mut found: Option<(String, String)> = None;
1310 for addr in &matched_addrs {
1311 if let Some(claimed) = map.remove(addr) {
1312 if claimed != fingerprint {
1313 found = Some((addr.clone(), claimed));
1314 break;
1315 }
1316 }
1317 }
1318 found
1319 };
1320 if let Some((addr, claimed)) = mismatch {
1321 warn!(
1322 %addr, %claimed, actual=%fingerprint,
1323 "invite fingerprint mismatch — disconnecting"
1324 );
1325 self.network.disconnect_peer(peer_id).await;
1326 let _ = self.app_event_tx.send(AppEvent::InviteFingerprintMismatch {
1327 address: addr,
1328 claimed,
1329 actual: fingerprint.clone(),
1330 });
1331 return;
1332 }
1333 for addr in matched_addrs {
1334 let _ = repo::upsert_known_peer(
1335 &self.db,
1336 &KnownPeer {
1337 address: addr,
1338 label: None,
1339 last_connected_at: Some(now_unix()),
1340 last_attempt_at: Some(now_unix()),
1341 created_at: now_unix(),
1342 fingerprint: Some(fingerprint.clone()),
1343 trusted: true,
1344 },
1345 );
1346 }
1347 let our_username = repo::get_display_name(&self.db).unwrap_or(None);
1355 if our_username.is_some() {
1356 let now_ms = now_unix_ms();
1357 let should_send = {
1358 let mut last = self.last_profile_broadcast_at_ms.lock().unwrap();
1359 match last.get(&fingerprint) {
1360 Some(prev) if now_ms - prev < PROFILE_REBROADCAST_FLOOR_MS => false,
1361 _ => {
1362 last.insert(fingerprint.clone(), now_ms);
1363 true
1364 }
1365 }
1366 };
1367 if should_send {
1368 let msg = RoomMessage::ProfileUpdate {
1369 sender_fingerprint: self.identity.fingerprint().to_string(),
1370 username: our_username,
1371 updated_at: now_ms,
1372 };
1373 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1374 if let Ok(bytes) =
1375 crate::network::protocol::encode_wire_signed(&env)
1376 {
1377 let rooms: Vec<String> = self
1378 .active_rooms
1379 .lock()
1380 .unwrap()
1381 .keys()
1382 .cloned()
1383 .collect();
1384 for room_id in rooms {
1385 self.network
1386 .publish_room_message(room_id, bytes.clone())
1387 .await;
1388 }
1389 }
1390 }
1391 }
1392 }
1393 }
1394 NetworkEvent::RelayReservationEstablished { address } => {
1395 info!(addr = %address, "relay reservation established");
1400 self.relay_circuit_addrs
1401 .lock()
1402 .unwrap()
1403 .insert(address.to_string());
1404 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1405 address: address.to_string(),
1406 });
1407 }
1408 NetworkEvent::NatProbeResult {
1409 tested_addr,
1410 reachable,
1411 } => {
1412 let addr_s = tested_addr.to_string();
1413 let (transitioned, becomes_reachable) = {
1414 let mut set = self.nat_reachable_addrs.lock().unwrap();
1415 let was_empty = set.is_empty();
1416 if reachable {
1417 set.insert(addr_s.clone());
1418 } else {
1419 set.remove(&addr_s);
1420 }
1421 let is_empty = set.is_empty();
1422 (was_empty != is_empty, !is_empty)
1423 };
1424 if transitioned {
1425 let label = if becomes_reachable {
1426 "reachable".to_string()
1427 } else {
1428 "private".to_string()
1429 };
1430 info!(reachable = %becomes_reachable, "NAT reachability changed");
1431 let _ = self.app_event_tx.send(AppEvent::NatStatusChanged {
1432 label,
1433 reachable: becomes_reachable,
1434 });
1435 }
1436 }
1437 NetworkEvent::DcutrUpgrade {
1438 remote_peer,
1439 success,
1440 } => {
1441 if success {
1442 let s = remote_peer.to_base58();
1446 let tail: String = s.chars().rev().take(8).collect::<String>()
1447 .chars()
1448 .rev()
1449 .collect();
1450 let _ = self.app_event_tx.send(AppEvent::DcutrSucceeded {
1451 peer_label: tail,
1452 });
1453 }
1454 }
1455 NetworkEvent::InboundDial {
1456 peer_id,
1457 fingerprint,
1458 address,
1459 } => {
1460 if repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false) {
1462 info!(%fingerprint, "inbound dial auto-rejected: peer is blocked");
1463 self.network.reject_inbound(peer_id).await;
1464 return;
1465 }
1466 let global_verified_only =
1471 repo::get_setting(&self.db, "verified_only_inbound")
1472 .ok()
1473 .flatten()
1474 .map(|v| v == "1")
1475 .unwrap_or(false);
1476 if global_verified_only {
1477 let is_verified =
1478 repo::is_globally_verified(&self.db, &fingerprint).unwrap_or(false)
1479 || repo::is_fingerprint_trusted(&self.db, &fingerprint)
1480 .unwrap_or(false);
1481 if !is_verified {
1482 info!(
1483 %fingerprint,
1484 "inbound dial auto-rejected: verified-only mode"
1485 );
1486 self.network.reject_inbound(peer_id).await;
1487 return;
1488 }
1489 }
1490 if repo::is_fingerprint_trusted(&self.db, &fingerprint).unwrap_or(false) {
1491 info!(%fingerprint, "inbound dial auto-accepted: peer is trusted");
1492 self.connected_dial_addrs
1495 .lock()
1496 .unwrap()
1497 .insert(address.to_string(), peer_id);
1498 let _ = repo::upsert_known_peer(
1499 &self.db,
1500 &KnownPeer {
1501 address: address.to_string(),
1502 label: None,
1503 last_connected_at: Some(now_unix()),
1504 last_attempt_at: Some(now_unix()),
1505 created_at: now_unix(),
1506 fingerprint: Some(fingerprint),
1507 trusted: true,
1508 },
1509 );
1510 self.network.accept_inbound(peer_id).await;
1511 return;
1512 }
1513 let _ = self.app_event_tx.send(AppEvent::InboundDial {
1515 peer_id,
1516 fingerprint,
1517 address: address.to_string(),
1518 });
1519 }
1520 }
1521 }
1522
1523 async fn handle_room_message(
1529 &self,
1530 room_id: &str,
1531 msg: RoomMessage,
1532 verified_signer: Option<String>,
1533 ) {
1534 let our_fp = self.identity.fingerprint().to_string();
1535 match msg {
1536 RoomMessage::MemberAnnounce {
1537 sender_fingerprint,
1538 wrapped_session_key,
1539 display_name,
1540 sender_ed25519_pubkey,
1541 } => {
1542 if sender_fingerprint == our_fp {
1543 return;
1544 }
1545 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
1548 .unwrap_or(false)
1549 {
1550 info!(%sender_fingerprint, %room_id, "dropping MemberAnnounce from banned peer");
1551 return;
1552 }
1553 if repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
1560 && !repo::is_globally_verified(&self.db, &sender_fingerprint).unwrap_or(false)
1561 {
1562 info!(
1563 %sender_fingerprint, %room_id,
1564 "dropping MemberAnnounce: room is verified-only and joiner isn't verified"
1565 );
1566 let owners = repo::list_room_owners(&self.db, room_id).unwrap_or_default();
1567 let lowest_owner = owners.iter().min().cloned();
1568 if lowest_owner.as_deref() == Some(&our_fp) {
1569 let msg = RoomMessage::JoinRefused {
1570 room_id: room_id.to_string(),
1571 target_fingerprint: sender_fingerprint.clone(),
1572 reason: "room requires SAS verification — ask an existing member to verify you".into(),
1573 };
1574 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1575 if let Ok(bytes) =
1576 crate::network::protocol::encode_wire_signed(&env)
1577 {
1578 self.network
1579 .publish_room_message(room_id.to_string(), bytes)
1580 .await;
1581 }
1582 }
1583 }
1584 return;
1585 }
1586 let need_inbound = {
1587 let mut rooms = self.active_rooms.lock().unwrap();
1588 let room = match rooms.get_mut(room_id) {
1589 Some(r) => r,
1590 None => return,
1591 };
1592 let newly_added = room.members.insert(sender_fingerprint.clone());
1593 if newly_added {
1594 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1595 room_id: room_id.to_string(),
1596 fingerprint: sender_fingerprint.clone(),
1597 });
1598 }
1599 let _ = repo::upsert_room_member(
1604 &self.db,
1605 &StoredRoomMember {
1606 room_id: room_id.to_string(),
1607 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
1609 last_seen: Some(now_unix()),
1610 verified: false,
1611 ed25519_pubkey: sender_ed25519_pubkey.clone(),
1612 role: "member".into(),
1618 },
1619 );
1620 if let Some(name) = display_name.as_deref() {
1621 let _ = repo::set_member_display_name(
1622 &self.db,
1623 room_id,
1624 &sender_fingerprint,
1625 Some(name),
1626 );
1627 }
1628 room.info.encrypted && wrapped_session_key.is_some()
1629 };
1630
1631 if need_inbound {
1632 let wrapped = wrapped_session_key.unwrap();
1633 let result = {
1634 let mut rooms = self.active_rooms.lock().unwrap();
1635 let room = rooms.get_mut(room_id).unwrap();
1636 let passphrase_key = match &room.passphrase_key {
1637 Some(k) => k,
1638 None => {
1639 warn!("no passphrase key when receiving session key");
1640 return;
1641 }
1642 };
1643 match passphrase::unwrap(&wrapped, passphrase_key) {
1644 Ok(plain) => match String::from_utf8(plain) {
1645 Ok(key_b64) => {
1646 let crypto = room.crypto.as_mut().unwrap();
1647 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
1648 }
1649 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
1650 },
1651 Err(e) => Err(e),
1652 }
1653 };
1654 if let Err(e) = result {
1655 error!(%e, "add inbound session failed");
1656 }
1657 }
1658 }
1659 RoomMessage::SessionKeyRequest {
1660 requester_fingerprint,
1661 } => {
1662 if requester_fingerprint == our_fp {
1663 return;
1664 }
1665 if let Err(e) = self.broadcast_member_announce(room_id).await {
1667 warn!(%e, "broadcast member announce on request");
1668 }
1669 }
1670 RoomMessage::Encrypted {
1671 sender_fingerprint,
1672 session_id,
1673 ciphertext_b64,
1674 } => {
1675 if sender_fingerprint == our_fp {
1676 return;
1677 }
1678 let ct_bytes = match base64::Engine::decode(
1679 &base64::engine::general_purpose::STANDARD,
1680 &ciphertext_b64,
1681 ) {
1682 Ok(b) => b,
1683 Err(e) => {
1684 warn!(%e, "bad base64 ciphertext");
1685 return;
1686 }
1687 };
1688 let plaintext = {
1689 let mut rooms = self.active_rooms.lock().unwrap();
1690 let room = match rooms.get_mut(room_id) {
1691 Some(r) => r,
1692 None => return,
1693 };
1694 let crypto = match room.crypto.as_mut() {
1695 Some(c) => c,
1696 None => return,
1697 };
1698 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1699 };
1700 match plaintext {
1701 Ok(pt) => {
1702 let body = String::from_utf8_lossy(&pt).to_string();
1703 let sent_at = now_unix();
1704 let _ = repo::insert_room_message(
1705 &self.db,
1706 room_id,
1707 &sender_fingerprint,
1708 "in",
1709 &body,
1710 sent_at,
1711 );
1712 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1713 self.maybe_emit_mention(room_id, &body);
1714 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1715 room_id: room_id.to_string(),
1716 sender_fingerprint,
1717 body,
1718 sent_at,
1719 });
1720 }
1721 Err(e) => {
1722 debug!(%e, "decrypt failed (probably missing session key)");
1723 }
1724 }
1725 }
1726 RoomMessage::Plain {
1727 sender_fingerprint,
1728 body,
1729 } => {
1730 if sender_fingerprint == our_fp {
1731 return;
1732 }
1733 let sent_at = now_unix();
1734 let _ = repo::insert_room_message(
1735 &self.db,
1736 room_id,
1737 &sender_fingerprint,
1738 "in",
1739 &body,
1740 sent_at,
1741 );
1742 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1743 self.maybe_emit_mention(room_id, &body);
1744 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1745 room_id: room_id.to_string(),
1746 sender_fingerprint,
1747 body,
1748 sent_at,
1749 });
1750 }
1751 RoomMessage::Typing { sender_fingerprint } => {
1752 if sender_fingerprint == our_fp {
1753 return;
1754 }
1755 let expiry = now_unix() + TYPING_TTL_SECS;
1756 let mut rooms = self.active_rooms.lock().unwrap();
1757 if let Some(room) = rooms.get_mut(room_id) {
1758 room.typers.insert(sender_fingerprint, expiry);
1759 }
1760 drop(rooms);
1761 let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1762 room_id: room_id.to_string(),
1763 });
1764 }
1765 RoomMessage::RotateRoomKey {
1766 rotator_fingerprint,
1767 new_salt,
1768 } => {
1769 if rotator_fingerprint == our_fp {
1770 return;
1771 }
1772 let signer = match verified_signer {
1777 Some(fp) => fp,
1778 None => {
1779 warn!(%room_id, "RotateRoomKey arrived unsigned; dropping");
1780 return;
1781 }
1782 };
1783 if signer != rotator_fingerprint {
1784 warn!(
1785 %signer, %rotator_fingerprint, %room_id,
1786 "RotateRoomKey signer mismatch with claimed rotator; dropping"
1787 );
1788 return;
1789 }
1790 let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1791 room_id: room_id.to_string(),
1792 rotator_fingerprint,
1793 new_salt,
1794 });
1795 }
1796 RoomMessage::MemberLeave { sender_fingerprint } => {
1797 if sender_fingerprint == our_fp {
1798 return;
1799 }
1800 let removed = {
1801 let mut rooms = self.active_rooms.lock().unwrap();
1802 if let Some(room) = rooms.get_mut(room_id) {
1803 room.members.remove(&sender_fingerprint)
1804 } else {
1805 false
1806 }
1807 };
1808 if removed {
1809 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1810 room_id: room_id.to_string(),
1811 fingerprint: sender_fingerprint,
1812 });
1813 }
1814 }
1815 RoomMessage::FileOffer {
1816 sender_fingerprint,
1817 file_id,
1818 name,
1819 size_bytes,
1820 mime,
1821 chunk_count,
1822 encrypted_meta,
1823 } => {
1824 if sender_fingerprint == our_fp {
1825 return; }
1827 self.handle_file_offer(
1828 room_id,
1829 sender_fingerprint,
1830 file_id,
1831 name,
1832 size_bytes,
1833 mime,
1834 chunk_count,
1835 encrypted_meta,
1836 );
1837 }
1838 RoomMessage::FileChunk {
1839 sender_fingerprint,
1840 file_id,
1841 chunk_index,
1842 total_chunks,
1843 data_b64,
1844 } => {
1845 if sender_fingerprint == our_fp {
1846 return;
1847 }
1848 self.handle_file_chunk(
1849 room_id,
1850 sender_fingerprint,
1851 file_id,
1852 chunk_index,
1853 total_chunks,
1854 data_b64,
1855 );
1856 }
1857 RoomMessage::OwnerGrant {
1858 room_id: announced_room_id,
1859 target_fingerprint,
1860 } => {
1861 if announced_room_id != room_id {
1866 warn!(payload_room = %announced_room_id, topic_room = %room_id, "OwnerGrant room mismatch");
1867 return;
1868 }
1869 let signer = match verified_signer {
1870 Some(fp) => fp,
1871 None => {
1872 warn!(%room_id, "OwnerGrant arrived unsigned; dropping");
1873 return;
1874 }
1875 };
1876 if !self.is_owner(room_id, &signer) {
1877 warn!(%signer, %room_id, "OwnerGrant signer isn't an owner; dropping");
1878 return;
1879 }
1880 info!(%signer, %target_fingerprint, %room_id, "OwnerGrant applied");
1881 if let Err(e) =
1882 repo::set_member_role(&self.db, room_id, &target_fingerprint, "owner")
1883 {
1884 warn!(%e, "OwnerGrant: set_member_role failed");
1885 }
1886 }
1887 RoomMessage::BanMember {
1888 room_id: announced_room_id,
1889 target_fingerprint,
1890 } => {
1891 if announced_room_id != room_id {
1892 warn!(payload_room = %announced_room_id, topic_room = %room_id, "BanMember room mismatch");
1893 return;
1894 }
1895 let signer = match verified_signer {
1896 Some(fp) => fp,
1897 None => {
1898 warn!(%room_id, "BanMember arrived unsigned; dropping");
1899 return;
1900 }
1901 };
1902 if !self.is_owner(room_id, &signer) {
1903 warn!(%signer, %room_id, "BanMember signer isn't an owner; dropping");
1904 return;
1905 }
1906 if target_fingerprint == our_fp {
1907 info!(%room_id, %signer, "we were kicked from this room");
1913 self.active_rooms.lock().unwrap().remove(room_id);
1914 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
1915 room_id: room_id.to_string(),
1916 });
1917 return;
1918 }
1919 info!(%signer, %target_fingerprint, %room_id, "BanMember applied");
1920 if let Err(e) = repo::add_room_ban(
1921 &self.db,
1922 room_id,
1923 &target_fingerprint,
1924 &signer,
1925 "", now_unix(),
1927 ) {
1928 warn!(%e, "BanMember: add_room_ban failed");
1929 }
1930 self.evict_banned_member(room_id, &target_fingerprint);
1931 }
1932 RoomMessage::SasInit {
1933 tx_id,
1934 ephemeral_x25519_pubkey_b64,
1935 target_fingerprint,
1936 } => {
1937 if target_fingerprint != our_fp {
1938 return;
1943 }
1944 let signer = match verified_signer {
1945 Some(fp) => fp,
1946 None => {
1947 warn!("SasInit arrived unsigned; dropping");
1948 return;
1949 }
1950 };
1951 let their_pub =
1952 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
1953 Ok(pk) => pk,
1954 Err(e) => {
1955 warn!(%e, "SasInit: bad x25519 pubkey");
1956 return;
1957 }
1958 };
1959 let tx_id_bytes = match B64.decode(&tx_id) {
1960 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
1961 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
1962 arr.copy_from_slice(&b);
1963 arr
1964 }
1965 _ => {
1966 warn!(%tx_id, "SasInit: bad tx_id length");
1967 return;
1968 }
1969 };
1970 let (_, our_secret, our_pub) = crate::crypto::sas::new_session();
1971 let sas_code =
1972 crate::crypto::sas::derive_sas_code(&our_secret, &their_pub, &tx_id_bytes);
1973 self.sas_flows.lock().unwrap().insert(
1974 tx_id.clone(),
1975 SasFlow {
1976 room_id: room_id.to_string(),
1977 partner_fingerprint: signer.clone(),
1978 our_secret,
1979 sas_code: Some(sas_code.clone()),
1980 our_confirmed: false,
1981 their_confirmed: false,
1982 },
1983 );
1984 let response = RoomMessage::SasResponse {
1987 tx_id: tx_id.clone(),
1988 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
1989 };
1990 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
1991 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
1992 self.network
1993 .publish_room_message(room_id.to_string(), bytes)
1994 .await;
1995 }
1996 }
1997 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
1998 room_id: room_id.to_string(),
1999 partner_fingerprint: signer,
2000 tx_id,
2001 emoji_string: sas_code.emoji_string(),
2002 emoji_labels: sas_code.emoji_labels(),
2003 decimal: sas_code.decimal,
2004 });
2005 }
2006 RoomMessage::SasResponse {
2007 tx_id,
2008 ephemeral_x25519_pubkey_b64,
2009 } => {
2010 let signer = match verified_signer {
2011 Some(fp) => fp,
2012 None => {
2013 warn!("SasResponse arrived unsigned; dropping");
2014 return;
2015 }
2016 };
2017 let their_pub =
2018 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
2019 Ok(pk) => pk,
2020 Err(e) => {
2021 warn!(%e, "SasResponse: bad x25519 pubkey");
2022 return;
2023 }
2024 };
2025 let tx_id_bytes = match B64.decode(&tx_id) {
2026 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
2027 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
2028 arr.copy_from_slice(&b);
2029 arr
2030 }
2031 _ => return,
2032 };
2033 let emit = {
2034 let mut flows = self.sas_flows.lock().unwrap();
2035 let flow = match flows.get_mut(&tx_id) {
2036 Some(f) => f,
2037 None => {
2038 warn!(%tx_id, "SasResponse for unknown tx_id");
2039 return;
2040 }
2041 };
2042 if flow.partner_fingerprint != signer {
2043 warn!(
2044 expected = %flow.partner_fingerprint, got = %signer,
2045 "SasResponse signer doesn't match flow's partner; dropping"
2046 );
2047 return;
2048 }
2049 let code = crate::crypto::sas::derive_sas_code(
2050 &flow.our_secret,
2051 &their_pub,
2052 &tx_id_bytes,
2053 );
2054 flow.sas_code = Some(code.clone());
2055 code
2056 };
2057 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
2058 room_id: room_id.to_string(),
2059 partner_fingerprint: signer,
2060 tx_id,
2061 emoji_string: emit.emoji_string(),
2062 emoji_labels: emit.emoji_labels(),
2063 decimal: emit.decimal,
2064 });
2065 }
2066 RoomMessage::CodeJoinRequest {
2067 room_id: announced_room_id,
2068 joiner_x25519_pubkey_b64,
2069 code,
2070 } => {
2071 if announced_room_id != room_id {
2072 return;
2073 }
2074 let joiner_fp = match verified_signer {
2075 Some(fp) => fp,
2076 None => {
2077 warn!("CodeJoinRequest unsigned; dropping");
2078 return;
2079 }
2080 };
2081 let our_fp = self.identity.fingerprint().to_string();
2085 if !self.is_owner(room_id, &our_fp) {
2086 return;
2087 }
2088 let now = now_unix();
2090 let (code_ok, our_session_id, wrap_input) = {
2091 let mut rooms = self.active_rooms.lock().unwrap();
2092 let room = match rooms.get_mut(room_id) {
2093 Some(r) => r,
2094 None => return,
2095 };
2096 if room.passphrase_key.is_none() {
2097 warn!("CodeJoinRequest: no passphrase key locally; can't respond");
2098 return;
2099 }
2100 let original_len = room.issued_codes.len();
2101 room.issued_codes.retain(|(c, exp)| !(c == &code && *exp > now));
2102 let matched = room.issued_codes.len() < original_len;
2103 if !matched {
2104 info!(%joiner_fp, "CodeJoinRequest: code invalid or expired; ignoring");
2105 return;
2106 }
2107 let crypto = room.crypto.as_ref().unwrap();
2108 (
2109 true,
2110 crypto.our_session_id(),
2111 crypto.our_session_key_b64(),
2112 )
2113 };
2114 let _ = code_ok;
2115 let their_pub = match crate::crypto::sas::parse_pubkey(&joiner_x25519_pubkey_b64) {
2117 Ok(pk) => pk,
2118 Err(e) => {
2119 warn!(%e, "CodeJoinRequest: bad pubkey");
2120 return;
2121 }
2122 };
2123 use x25519_dalek::{PublicKey, StaticSecret};
2124 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2125 let our_pub = PublicKey::from(&our_secret);
2126 let shared = our_secret.diffie_hellman(&their_pub);
2127 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
2129 let mut wrap_key = [0u8; passphrase::KEY_LEN];
2130 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
2131 .expect("32 bytes is within HKDF limits");
2132 let wrapped = match passphrase::wrap(wrap_input.as_bytes(), &wrap_key) {
2135 Ok(w) => w,
2136 Err(e) => {
2137 warn!(%e, "CodeJoinRequest: wrap failed");
2138 return;
2139 }
2140 };
2141 let response = RoomMessage::CodeJoinResponse {
2142 room_id: room_id.to_string(),
2143 target_fingerprint: joiner_fp.clone(),
2144 owner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2145 owner_session_id: our_session_id,
2146 wrapped_session_key_b64: wrapped,
2147 nonce_b64: String::new(), };
2149 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
2150 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
2151 self.network
2152 .publish_room_message(room_id.to_string(), bytes)
2153 .await;
2154 }
2155 }
2156 info!(%joiner_fp, %room_id, "issued CodeJoinResponse");
2157 }
2158 RoomMessage::CodeJoinResponse {
2159 room_id: announced_room_id,
2160 target_fingerprint,
2161 owner_x25519_pubkey_b64,
2162 owner_session_id,
2163 wrapped_session_key_b64,
2164 nonce_b64: _,
2165 } => {
2166 if announced_room_id != room_id || target_fingerprint != our_fp {
2167 return;
2168 }
2169 let owner_fp = match verified_signer {
2170 Some(fp) => fp,
2171 None => {
2172 warn!("CodeJoinResponse unsigned; dropping");
2173 return;
2174 }
2175 };
2176 let our_secret = match self
2177 .pending_code_secrets
2178 .lock()
2179 .unwrap()
2180 .remove(&(room_id.to_string(), our_fp.clone()))
2181 {
2182 Some(s) => s,
2183 None => {
2184 warn!(%room_id, "CodeJoinResponse with no pending code-join state");
2185 return;
2186 }
2187 };
2188 let owner_pub = match crate::crypto::sas::parse_pubkey(&owner_x25519_pubkey_b64) {
2189 Ok(pk) => pk,
2190 Err(e) => {
2191 warn!(%e, "CodeJoinResponse: bad owner pubkey");
2192 return;
2193 }
2194 };
2195 let shared = our_secret.diffie_hellman(&owner_pub);
2196 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
2197 let mut wrap_key = [0u8; passphrase::KEY_LEN];
2198 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
2199 .expect("32 bytes within HKDF limits");
2200 let session_key_bytes =
2201 match passphrase::unwrap(&wrapped_session_key_b64, &wrap_key) {
2202 Ok(b) => b,
2203 Err(e) => {
2204 warn!(%e, "CodeJoinResponse: unwrap failed");
2205 return;
2206 }
2207 };
2208 let session_key_str = match String::from_utf8(session_key_bytes) {
2209 Ok(s) => s,
2210 Err(e) => {
2211 warn!(%e, "CodeJoinResponse: session key wasn't valid utf8");
2212 return;
2213 }
2214 };
2215 let mut rooms = self.active_rooms.lock().unwrap();
2217 if let Some(room) = rooms.get_mut(room_id) {
2218 if let Some(crypto) = room.crypto.as_mut() {
2219 if let Err(e) =
2220 crypto.add_inbound_session(&owner_fp, &session_key_str)
2221 {
2222 warn!(%e, "CodeJoinResponse: add_inbound_session failed");
2223 } else {
2224 info!(%room_id, %owner_fp, %owner_session_id, "code-join completed; can decrypt owner's messages");
2225 room.members.insert(owner_fp.clone());
2226 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
2227 room_id: room_id.to_string(),
2228 fingerprint: owner_fp,
2229 });
2230 }
2231 }
2232 }
2233 }
2234 RoomMessage::JoinRefused {
2235 room_id: announced_room_id,
2236 target_fingerprint,
2237 reason,
2238 } => {
2239 if announced_room_id != room_id || target_fingerprint != our_fp {
2240 return;
2241 }
2242 let _ = self.app_event_tx.send(AppEvent::Error {
2246 description: format!("join refused: {reason}"),
2247 });
2248 }
2249 RoomMessage::SasConfirm { tx_id, matched } => {
2250 let signer = match verified_signer {
2251 Some(fp) => fp,
2252 None => return,
2253 };
2254 let (room_id_done, partner_fp_done, both_done) = {
2255 let mut flows = self.sas_flows.lock().unwrap();
2256 let flow = match flows.get_mut(&tx_id) {
2257 Some(f) => f,
2258 None => return,
2259 };
2260 if flow.partner_fingerprint != signer {
2261 return;
2262 }
2263 if !matched {
2264 let _ = flow;
2266 flows.remove(&tx_id);
2267 return;
2268 }
2269 flow.their_confirmed = true;
2270 if flow.our_confirmed && flow.their_confirmed {
2271 (
2272 Some(flow.room_id.clone()),
2273 Some(flow.partner_fingerprint.clone()),
2274 true,
2275 )
2276 } else {
2277 (None, None, false)
2278 }
2279 };
2280 if both_done {
2281 if let (Some(rid), Some(pfp)) = (room_id_done, partner_fp_done) {
2282 if let Err(e) = self.finish_sas(&tx_id, &rid, &pfp).await {
2283 warn!(%e, "finish_sas failed");
2284 }
2285 }
2286 }
2287 }
2288 RoomMessage::ProfileUpdate {
2289 sender_fingerprint,
2290 username,
2291 updated_at,
2292 } => {
2293 let signer = match verified_signer {
2299 Some(fp) => fp,
2300 None => {
2301 warn!(
2302 sender = %sender_fingerprint,
2303 "dropping unsigned ProfileUpdate"
2304 );
2305 return;
2306 }
2307 };
2308 if signer != sender_fingerprint {
2309 warn!(
2310 signer = %signer,
2311 claimed = %sender_fingerprint,
2312 "dropping ProfileUpdate with signer != sender"
2313 );
2314 return;
2315 }
2316 if let Err(e) = repo::upsert_peer_profile(
2317 &self.db,
2318 &sender_fingerprint,
2319 username.as_deref(),
2320 updated_at,
2321 ) {
2322 warn!(%e, "upsert_peer_profile failed");
2323 return;
2324 }
2325 let _ = self.app_event_tx.send(AppEvent::PeerProfileUpdated {
2326 fingerprint: sender_fingerprint,
2327 username,
2328 });
2329 }
2330 }
2331 }
2332
2333 pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
2341 let bytes = std::fs::read(path)?;
2342 let name = path
2343 .file_name()
2344 .map(|n| n.to_string_lossy().to_string())
2345 .unwrap_or_else(|| "untitled".into());
2346 let mime = crate::files::guess_mime(&name);
2347 let original_path = path.to_path_buf();
2348
2349 let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
2350 let mut rooms = self.active_rooms.lock().unwrap();
2351 let room = rooms
2352 .get_mut(room_id)
2353 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2354 if room.info.encrypted {
2355 let crypto = room
2356 .crypto
2357 .as_mut()
2358 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
2359 let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
2360 (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
2361 } else {
2362 (false, None, None, bytes)
2363 }
2364 };
2365 let _ = &mut maybe_session_id; let plan =
2368 self.file_manager
2369 .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
2370 let file_id = plan.file_id.clone();
2371 let total = plan.chunks.len() as u32;
2372 let our_fp = self.identity.fingerprint().to_string();
2373
2374 let attachment = StoredAttachment {
2375 id: 0,
2376 room_id: room_id.to_string(),
2377 message_id: None,
2378 sender_fingerprint: our_fp.clone(),
2379 file_id: file_id.clone(),
2380 name: name.clone(),
2381 mime: mime.clone(),
2382 size_bytes: plan.size_bytes as i64,
2383 status: AttachmentStatus::Ready,
2384 cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
2385 saved_path: Some(original_path.to_string_lossy().into()),
2386 error: None,
2387 encrypted: room_encrypted,
2388 wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
2389 nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
2390 megolm_session_id: encrypted_meta_opt
2391 .as_ref()
2392 .map(|m| m.megolm_session_id.clone()),
2393 content_hash: encrypted_meta_opt.as_ref().map(|m| m.content_hash.clone()),
2394 created_at: now_unix(),
2395 };
2396 repo::upsert_attachment(&self.db, &attachment)?;
2397 let _ = self.app_event_tx.send(AppEvent::FileOffered {
2398 room_id: room_id.to_string(),
2399 file_id: file_id.clone(),
2400 name: name.clone(),
2401 size_bytes: plan.size_bytes,
2402 sender_fingerprint: our_fp.clone(),
2403 });
2404
2405 let offer = RoomMessage::FileOffer {
2407 sender_fingerprint: our_fp.clone(),
2408 file_id: file_id.clone(),
2409 name,
2410 size_bytes: plan.size_bytes,
2411 mime,
2412 chunk_count: total,
2413 encrypted_meta: encrypted_meta_opt,
2414 };
2415 if let Ok(bytes) = encode_wire(&offer) {
2416 self.network
2417 .publish_room_message(room_id.to_string(), bytes)
2418 .await;
2419 }
2420
2421 let net = self.network.clone();
2424 let room = room_id.to_string();
2425 let our = our_fp.clone();
2426 let fid = file_id.clone();
2427 let chunks = plan.chunks.clone();
2428 tokio::spawn(async move {
2429 for (i, data) in chunks.iter().enumerate() {
2430 let msg = RoomMessage::FileChunk {
2431 sender_fingerprint: our.clone(),
2432 file_id: fid.clone(),
2433 chunk_index: i as u32,
2434 total_chunks: total,
2435 data_b64: B64.encode(data),
2436 };
2437 if let Ok(bytes) = encode_wire(&msg) {
2438 net.publish_room_message(room.clone(), bytes).await;
2439 }
2440 tokio::time::sleep(Duration::from_millis(40)).await;
2441 }
2442 });
2443
2444 Ok(file_id)
2445 }
2446
2447 pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
2450 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2451 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2452 if !matches!(
2453 attachment.status,
2454 AttachmentStatus::Ready | AttachmentStatus::Saved
2455 ) {
2456 return Err(HuddleError::Other(format!(
2457 "attachment is not ready (status={})",
2458 attachment.status.as_str()
2459 )));
2460 }
2461 let plaintext = if attachment.encrypted
2466 && attachment.sender_fingerprint == self.identity.fingerprint()
2467 {
2468 match attachment
2469 .saved_path
2470 .as_deref()
2471 .filter(|p| Path::new(p).exists())
2472 {
2473 Some(src) => std::fs::read(src)?,
2474 None => {
2475 return Err(HuddleError::Other(
2476 "your original file has moved or been deleted — it can't be \
2477 recovered from the encrypted cache"
2478 .into(),
2479 ));
2480 }
2481 }
2482 } else {
2483 let cached = self.file_manager.read_cache(file_id)?;
2484 if attachment.encrypted {
2485 let meta = EncryptedFileMeta {
2486 megolm_session_id: attachment
2487 .megolm_session_id
2488 .clone()
2489 .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
2490 wrapped_key_b64: attachment
2491 .wrapped_key
2492 .clone()
2493 .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
2494 nonce_b64: attachment
2495 .nonce
2496 .clone()
2497 .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
2498 content_hash: attachment
2499 .content_hash
2500 .clone()
2501 .ok_or_else(|| HuddleError::Other("missing content_hash".into()))?,
2502 };
2503 self.decrypt_attachment(
2504 room_id,
2505 &attachment.sender_fingerprint,
2506 &cached,
2507 &meta,
2508 )?
2509 } else {
2510 cached
2511 }
2512 };
2513 let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
2514 repo::update_attachment_paths(
2515 &self.db,
2516 room_id,
2517 file_id,
2518 None,
2519 Some(&saved.to_string_lossy()),
2520 )?;
2521 repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
2522 let _ = self.app_event_tx.send(AppEvent::FileSaved {
2523 file_id: file_id.into(),
2524 path: saved.to_string_lossy().into(),
2525 });
2526 Ok(saved)
2527 }
2528
2529 pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
2531 self.file_manager.cancel_incoming(file_id);
2532 repo::update_attachment_status(
2533 &self.db,
2534 room_id,
2535 file_id,
2536 AttachmentStatus::Cancelled,
2537 None,
2538 )?;
2539 Ok(())
2540 }
2541
2542 pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
2544 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2545 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2546 let path = attachment
2547 .saved_path
2548 .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
2549 open_with_system(&path)
2550 }
2551
2552 pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
2553 repo::list_room_attachments(&self.db, room_id)
2554 }
2555
2556 pub fn set_member_verified(
2560 &self,
2561 room_id: &str,
2562 fingerprint: &str,
2563 verified: bool,
2564 ) -> Result<()> {
2565 let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
2570 if !members.iter().any(|m| m.fingerprint == fingerprint) {
2571 repo::upsert_room_member(
2572 &self.db,
2573 &StoredRoomMember {
2574 room_id: room_id.to_string(),
2575 peer_id: String::new(),
2576 fingerprint: fingerprint.to_string(),
2577 last_seen: Some(now_unix()),
2578 verified,
2579 ed25519_pubkey: None,
2580 role: "member".into(),
2581 },
2582 )?;
2583 }
2584 repo::set_member_verified(&self.db, room_id, fingerprint, verified)
2585 }
2586
2587 pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
2588 repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
2589 }
2590
2591 pub fn is_owner(&self, room_id: &str, fingerprint: &str) -> bool {
2594 repo::list_room_owners(&self.db, room_id)
2595 .unwrap_or_default()
2596 .iter()
2597 .any(|fp| fp == fingerprint)
2598 }
2599
2600 pub fn we_are_owner(&self, room_id: &str) -> bool {
2601 self.is_owner(room_id, &self.identity.fingerprint().to_string())
2602 }
2603
2604 pub fn room_owners(&self, room_id: &str) -> Vec<String> {
2607 repo::list_room_owners(&self.db, room_id).unwrap_or_default()
2608 }
2609
2610 pub fn verified_only_inbound(&self) -> bool {
2613 repo::get_setting(&self.db, "verified_only_inbound")
2614 .unwrap_or(None)
2615 .map(|v| v == "1")
2616 .unwrap_or(false)
2617 }
2618
2619 pub fn set_verified_only_inbound(&self, on: bool) -> Result<()> {
2620 repo::set_setting(&self.db, "verified_only_inbound", if on { "1" } else { "0" })
2621 }
2622
2623 pub fn room_verified_only(&self, room_id: &str) -> bool {
2628 repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
2629 }
2630
2631 pub fn set_room_verified_only(&self, room_id: &str, on: bool) -> Result<()> {
2632 repo::set_room_verified_only(&self.db, room_id, on)
2633 }
2634
2635 pub fn onboarding_seen(&self) -> bool {
2637 repo::is_onboarding_seen(&self.db).unwrap_or(true)
2638 }
2639
2640 pub fn mark_onboarding_seen(&self) -> Result<()> {
2641 repo::mark_onboarding_seen(&self.db)
2642 }
2643
2644 pub async fn grant_owner(&self, room_id: &str, target_fingerprint: &str) -> Result<()> {
2648 let our_fp = self.identity.fingerprint().to_string();
2649 if !self.is_owner(room_id, &our_fp) {
2650 return Err(HuddleError::Other(
2651 "only an owner can grant owner".into(),
2652 ));
2653 }
2654 let msg = RoomMessage::OwnerGrant {
2655 room_id: room_id.to_string(),
2656 target_fingerprint: target_fingerprint.to_string(),
2657 };
2658 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2659 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2660 self.network
2661 .publish_room_message(room_id.to_string(), bytes)
2662 .await;
2663 repo::set_member_role(&self.db, room_id, target_fingerprint, "owner")?;
2665 Ok(())
2666 }
2667
2668 pub async fn kick_member(
2679 &self,
2680 room_id: &str,
2681 target_fingerprint: &str,
2682 ) -> Result<String> {
2683 let our_fp = self.identity.fingerprint().to_string();
2684 if !self.is_owner(room_id, &our_fp) {
2685 return Err(HuddleError::Other("only an owner can kick".into()));
2686 }
2687 if target_fingerprint == our_fp {
2688 return Err(HuddleError::Other("can't kick yourself".into()));
2689 }
2690 let info = self
2691 .active_rooms
2692 .lock()
2693 .unwrap()
2694 .get(room_id)
2695 .map(|r| r.info.clone())
2696 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2697 if !info.encrypted {
2698 let msg = RoomMessage::BanMember {
2702 room_id: room_id.to_string(),
2703 target_fingerprint: target_fingerprint.to_string(),
2704 };
2705 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2706 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2707 self.network
2708 .publish_room_message(room_id.to_string(), bytes)
2709 .await;
2710 repo::add_room_ban(
2711 &self.db,
2712 room_id,
2713 target_fingerprint,
2714 &our_fp,
2715 &env.signature_b64,
2716 now_unix(),
2717 )?;
2718 self.evict_banned_member(room_id, target_fingerprint);
2719 return Ok(String::new());
2720 }
2721 let new_passphrase = generate_join_passphrase();
2723 let msg = RoomMessage::BanMember {
2724 room_id: room_id.to_string(),
2725 target_fingerprint: target_fingerprint.to_string(),
2726 };
2727 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2728 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2729 self.network
2730 .publish_room_message(room_id.to_string(), bytes)
2731 .await;
2732 repo::add_room_ban(
2733 &self.db,
2734 room_id,
2735 target_fingerprint,
2736 &our_fp,
2737 &env.signature_b64,
2738 now_unix(),
2739 )?;
2740 self.evict_banned_member(room_id, target_fingerprint);
2741 self.rotate_room(room_id, &new_passphrase).await?;
2744 Ok(new_passphrase)
2745 }
2746
2747 pub fn generate_join_code(&self, room_id: &str) -> Result<String> {
2754 let our_fp = self.identity.fingerprint().to_string();
2755 if !self.is_owner(room_id, &our_fp) {
2756 return Err(HuddleError::Other(
2757 "only an owner can issue join codes".into(),
2758 ));
2759 }
2760 let code = generate_alphanumeric_code(8);
2761 let expires_at = now_unix() + 10 * 60;
2762 let mut rooms = self.active_rooms.lock().unwrap();
2763 let room = rooms
2764 .get_mut(room_id)
2765 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2766 let now = now_unix();
2768 room.issued_codes.retain(|(_, exp)| *exp > now);
2769 room.issued_codes.push((code.clone(), expires_at));
2770 Ok(code)
2771 }
2772
2773 pub async fn join_room_with_code(
2780 &self,
2781 room_id: &str,
2782 code: &str,
2783 ) -> Result<()> {
2784 let info = {
2786 let d = self.discovered_rooms.lock().unwrap().get(room_id).cloned();
2787 match d {
2788 Some(d) => StoredRoom {
2789 id: room_id.to_string(),
2790 name: d.name,
2791 creator_fingerprint: d.creator_fingerprint,
2792 encrypted: d.encrypted,
2793 passphrase_salt: None, created_at: now_unix(),
2795 last_active: Some(now_unix()),
2796 },
2797 None => {
2798 return Err(HuddleError::Other(format!(
2799 "room {room_id} not visible — wait for an announcement"
2800 )))
2801 }
2802 }
2803 };
2804 if !info.encrypted {
2805 return Err(HuddleError::Other(
2806 "code-join only applies to encrypted rooms".into(),
2807 ));
2808 }
2809 let our_fp = self.identity.fingerprint().to_string();
2810 use x25519_dalek::{PublicKey, StaticSecret};
2813 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2814 let our_pub = PublicKey::from(&our_secret);
2815 let key = (room_id.to_string(), our_fp.clone());
2820 self.pending_code_secrets
2821 .lock()
2822 .unwrap()
2823 .insert(key.clone(), our_secret);
2824 let map = self.pending_code_secrets.clone();
2829 let tx = self.app_event_tx.clone();
2830 let timeout_room = room_id.to_string();
2831 tokio::spawn(async move {
2832 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
2833 let still_pending = map.lock().unwrap().remove(&key).is_some();
2834 if still_pending {
2835 let _ = tx.send(AppEvent::CodeJoinTimedOut {
2836 room_id: timeout_room,
2837 reason: "no response from owner — code may be wrong or expired".into(),
2838 });
2839 }
2840 });
2841 repo::insert_room(&self.db, &info)?;
2848 self.active_rooms.lock().unwrap().insert(
2851 room_id.to_string(),
2852 ActiveRoom {
2853 info: info.clone(),
2854 crypto: Some(RoomCrypto::new_for_room(
2855 self.db.clone(),
2856 room_id.to_string(),
2857 our_fp.clone(),
2858 self.session_persist_key,
2859 )?),
2860 passphrase_key: None,
2861 members: {
2862 let mut s = HashSet::new();
2863 s.insert(our_fp.clone());
2864 s
2865 },
2866 typers: HashMap::new(),
2867 read_only: true,
2868 issued_codes: Vec::new(),
2869 },
2870 );
2871 self.network.subscribe_room(room_id.to_string()).await;
2872 let req = RoomMessage::CodeJoinRequest {
2874 room_id: room_id.to_string(),
2875 joiner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2876 code: code.to_string(),
2877 };
2878 let env = crate::crypto::sign_message(&self.identity, &req)?;
2879 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2880 self.network
2881 .publish_room_message(room_id.to_string(), bytes)
2882 .await;
2883 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
2886 room_id: room_id.to_string(),
2887 });
2888 Ok(())
2889 }
2890
2891 pub async fn sas_start(&self, room_id: &str, target_fingerprint: &str) -> Result<String> {
2897 let (tx_id_bytes, our_secret, our_pub) = crate::crypto::sas::new_session();
2898 let tx_id = B64.encode(tx_id_bytes);
2899 let msg = RoomMessage::SasInit {
2900 tx_id: tx_id.clone(),
2901 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2902 target_fingerprint: target_fingerprint.to_string(),
2903 };
2904 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2905 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2906 self.sas_flows.lock().unwrap().insert(
2907 tx_id.clone(),
2908 SasFlow {
2909 room_id: room_id.to_string(),
2910 partner_fingerprint: target_fingerprint.to_string(),
2911 our_secret,
2912 sas_code: None,
2913 our_confirmed: false,
2914 their_confirmed: false,
2915 },
2916 );
2917 self.network
2918 .publish_room_message(room_id.to_string(), bytes)
2919 .await;
2920 Ok(tx_id)
2921 }
2922
2923 pub async fn sas_match(&self, tx_id: &str) -> Result<()> {
2927 let (room_id, partner_fp, both_done) = {
2928 let mut flows = self.sas_flows.lock().unwrap();
2929 let flow = flows
2930 .get_mut(tx_id)
2931 .ok_or_else(|| HuddleError::Other("unknown SAS tx_id".into()))?;
2932 flow.our_confirmed = true;
2933 (
2934 flow.room_id.clone(),
2935 flow.partner_fingerprint.clone(),
2936 flow.our_confirmed && flow.their_confirmed,
2937 )
2938 };
2939 let msg = RoomMessage::SasConfirm {
2940 tx_id: tx_id.to_string(),
2941 matched: true,
2942 };
2943 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2944 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2945 self.network
2946 .publish_room_message(room_id.clone(), bytes)
2947 .await;
2948 if both_done {
2949 self.finish_sas(tx_id, &room_id, &partner_fp).await?;
2950 }
2951 Ok(())
2952 }
2953
2954 pub fn sas_cancel(&self, tx_id: &str) {
2958 self.sas_flows.lock().unwrap().remove(tx_id);
2959 }
2960
2961 async fn finish_sas(
2964 &self,
2965 tx_id: &str,
2966 room_id: &str,
2967 partner_fingerprint: &str,
2968 ) -> Result<()> {
2969 repo::set_member_verified(&self.db, room_id, partner_fingerprint, true)?;
2970 repo::add_verified_peer(&self.db, partner_fingerprint, now_unix())?;
2971 self.sas_flows.lock().unwrap().remove(tx_id);
2972 let _ = self.app_event_tx.send(AppEvent::SasVerified {
2973 room_id: room_id.to_string(),
2974 partner_fingerprint: partner_fingerprint.to_string(),
2975 });
2976 Ok(())
2977 }
2978
2979 fn evict_banned_member(&self, room_id: &str, fingerprint: &str) {
2984 if let Some(room) = self.active_rooms.lock().unwrap().get_mut(room_id) {
2985 room.members.remove(fingerprint);
2986 }
2987 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
2988 room_id: room_id.to_string(),
2989 fingerprint: fingerprint.to_string(),
2990 });
2991 }
2992
2993 pub fn display_name(&self) -> Option<String> {
2994 repo::get_display_name(&self.db).unwrap_or(None)
2995 }
2996
2997 pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
2998 repo::set_display_name(&self.db, name)
2999 }
3000
3001 pub async fn set_username(&self, name: Option<&str>) -> Result<()> {
3007 repo::set_display_name(&self.db, name)?;
3008 let msg = RoomMessage::ProfileUpdate {
3009 sender_fingerprint: self.identity.fingerprint().to_string(),
3010 username: name.map(|s| s.to_string()),
3011 updated_at: now_unix_ms(),
3012 };
3013 let env = crate::crypto::sign_message(&self.identity, &msg)?;
3014 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
3015 let rooms: Vec<String> = self.active_rooms.lock().unwrap().keys().cloned().collect();
3016 for room_id in rooms {
3017 self.network
3018 .publish_room_message(room_id, bytes.clone())
3019 .await;
3020 }
3021 Ok(())
3022 }
3023
3024 pub fn lookup_username(&self, fingerprint: &str) -> Option<String> {
3029 repo::get_peer_username(&self.db, fingerprint).unwrap_or(None)
3030 }
3031
3032 pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
3036 self.lookup_username(fingerprint)
3037 }
3038
3039 pub fn is_room_muted(&self, room_id: &str) -> bool {
3040 repo::is_room_muted(&self.db, room_id).unwrap_or(false)
3041 }
3042
3043 pub fn list_room_bans(&self, room_id: &str) -> Vec<String> {
3048 repo::list_room_bans(&self.db, room_id).unwrap_or_default()
3049 }
3050
3051 pub fn list_blocked_peers(&self) -> Vec<String> {
3055 repo::list_blocked_peers(&self.db).unwrap_or_default()
3056 }
3057
3058 pub fn unblock_peer(&self, fingerprint: &str) -> Result<()> {
3062 repo::unblock_peer(&self.db, fingerprint)
3063 }
3064
3065 pub fn is_room_read_only(&self, room_id: &str) -> bool {
3071 self.active_rooms
3072 .lock()
3073 .unwrap()
3074 .get(room_id)
3075 .map(|r| r.read_only)
3076 .unwrap_or(false)
3077 }
3078
3079 pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
3080 repo::set_room_muted(&self.db, room_id, muted)
3081 }
3082
3083 pub async fn broadcast_typing(&self, room_id: &str) {
3086 if !self.active_rooms.lock().unwrap().contains_key(room_id) {
3087 return;
3088 }
3089 let msg = RoomMessage::Typing {
3090 sender_fingerprint: self.identity.fingerprint().to_string(),
3091 };
3092 if let Ok(bytes) = encode_wire(&msg) {
3093 self.network
3094 .publish_room_message(room_id.to_string(), bytes)
3095 .await;
3096 }
3097 }
3098
3099 pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
3102 let now = now_unix();
3103 let mut rooms = self.active_rooms.lock().unwrap();
3104 let room = match rooms.get_mut(room_id) {
3105 Some(r) => r,
3106 None => return Vec::new(),
3107 };
3108 room.typers.retain(|_, exp| *exp > now);
3109 let mut v: Vec<String> = room.typers.keys().cloned().collect();
3110 v.sort();
3111 v
3112 }
3113
3114 pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
3124 if new_passphrase.is_empty() {
3125 return Err(HuddleError::Other("new passphrase is empty".into()));
3126 }
3127 let new_salt = passphrase::random_salt();
3128 let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
3129
3130 let info = {
3131 let mut rooms = self.active_rooms.lock().unwrap();
3132 let room = rooms
3133 .get_mut(room_id)
3134 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3135 if !room.info.encrypted {
3136 return Err(HuddleError::Other(
3137 "rotation only applies to encrypted rooms".into(),
3138 ));
3139 }
3140 let new_crypto = RoomCrypto::new_for_room(
3142 self.db.clone(),
3143 room_id.to_string(),
3144 self.identity.fingerprint().to_string(),
3145 self.session_persist_key,
3146 )?;
3147 room.crypto = Some(new_crypto);
3148 room.passphrase_key = Some(new_key);
3149 room.info.passphrase_salt = Some(new_salt.to_vec());
3150 room.info.clone()
3151 };
3152
3153 let rot = RoomMessage::RotateRoomKey {
3159 rotator_fingerprint: self.identity.fingerprint().to_string(),
3160 new_salt: new_salt.to_vec(),
3161 };
3162 if let Ok(env) = crate::crypto::sign_message(&self.identity, &rot) {
3166 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
3167 self.network
3168 .publish_room_message(room_id.to_string(), bytes)
3169 .await;
3170 }
3171 }
3172 if let Err(e) = self.broadcast_member_announce(room_id).await {
3174 warn!(%e, "rotate: broadcast announce failed");
3175 }
3176
3177 repo::insert_room(&self.db, &info)?;
3179 Ok(())
3180 }
3181
3182 pub async fn accept_rotation(
3186 &self,
3187 room_id: &str,
3188 new_salt: &[u8],
3189 new_passphrase: &str,
3190 ) -> Result<()> {
3191 let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
3192 let info = {
3193 let mut rooms = self.active_rooms.lock().unwrap();
3194 let room = rooms
3195 .get_mut(room_id)
3196 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
3197 room.passphrase_key = Some(new_key);
3198 room.info.passphrase_salt = Some(new_salt.to_vec());
3199 room.info.clone()
3200 };
3201 let req = RoomMessage::SessionKeyRequest {
3205 requester_fingerprint: self.identity.fingerprint().to_string(),
3206 };
3207 if let Ok(bytes) = encode_wire(&req) {
3208 self.network
3209 .publish_room_message(room_id.to_string(), bytes)
3210 .await;
3211 }
3212 repo::insert_room(&self.db, &info)?;
3213 Ok(())
3214 }
3215
3216 #[allow(clippy::too_many_arguments)]
3221 fn handle_file_offer(
3222 &self,
3223 room_id: &str,
3224 sender_fingerprint: String,
3225 file_id: String,
3226 name: String,
3227 size_bytes: u64,
3228 mime: Option<String>,
3229 _chunk_count: u32,
3230 encrypted_meta: Option<EncryptedFileMeta>,
3231 ) {
3232 let encrypted = encrypted_meta.is_some();
3233 let attachment = StoredAttachment {
3234 id: 0,
3235 room_id: room_id.to_string(),
3236 message_id: None,
3237 sender_fingerprint: sender_fingerprint.clone(),
3238 file_id: file_id.clone(),
3239 name: name.clone(),
3240 mime,
3241 size_bytes: size_bytes as i64,
3242 status: AttachmentStatus::Offered,
3243 cache_path: None,
3244 saved_path: None,
3245 error: None,
3246 encrypted,
3247 wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
3248 nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
3249 megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
3250 content_hash: encrypted_meta.as_ref().map(|m| m.content_hash.clone()),
3251 created_at: now_unix(),
3252 };
3253 if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
3254 warn!(%e, "upsert attachment");
3255 return;
3256 }
3257 self.file_manager.set_expected_size(&file_id, size_bytes);
3260 let _ = self.app_event_tx.send(AppEvent::FileOffered {
3261 room_id: room_id.to_string(),
3262 file_id,
3263 name,
3264 size_bytes,
3265 sender_fingerprint,
3266 });
3267 }
3268
3269 fn handle_file_chunk(
3270 &self,
3271 room_id: &str,
3272 _sender_fingerprint: String,
3273 file_id: String,
3274 chunk_index: u32,
3275 total_chunks: u32,
3276 data_b64: String,
3277 ) {
3278 let data = match B64.decode(&data_b64) {
3279 Ok(d) => d,
3280 Err(e) => {
3281 warn!(%e, "bad chunk base64");
3282 return;
3283 }
3284 };
3285 let expected_size = match repo::get_attachment(&self.db, room_id, &file_id) {
3289 Ok(Some(a)) => {
3290 if matches!(
3291 a.status,
3292 AttachmentStatus::Cancelled | AttachmentStatus::Failed
3293 ) {
3294 return;
3295 }
3296 a.size_bytes as u64
3297 }
3298 Ok(None) => crate::files::MAX_FILE_SIZE,
3299 Err(e) => {
3300 warn!(%e, "get attachment for chunk");
3301 crate::files::MAX_FILE_SIZE
3302 }
3303 };
3304
3305 let result = self.file_manager.accept_chunk(
3306 &file_id,
3307 chunk_index,
3308 total_chunks,
3309 data,
3310 expected_size,
3311 );
3312 match result {
3313 Ok(None) => {
3314 let _ = repo::update_attachment_status(
3316 &self.db,
3317 room_id,
3318 &file_id,
3319 AttachmentStatus::Downloading,
3320 None,
3321 );
3322 let bytes_so_far = self
3325 .file_manager
3326 .progress(&file_id)
3327 .map(|(b, _)| b)
3328 .unwrap_or(0);
3329 let _ = self.app_event_tx.send(AppEvent::FileProgress {
3330 file_id: file_id.clone(),
3331 bytes_received: bytes_so_far,
3332 total_bytes: expected_size,
3333 });
3334 }
3335 Ok(Some(completed)) => {
3336 let _ = repo::update_attachment_paths(
3337 &self.db,
3338 room_id,
3339 &file_id,
3340 Some(&completed.cache_path.to_string_lossy()),
3341 None,
3342 );
3343 let _ = repo::update_attachment_status(
3344 &self.db,
3345 room_id,
3346 &file_id,
3347 AttachmentStatus::Ready,
3348 None,
3349 );
3350 let _ = self.app_event_tx.send(AppEvent::FileReady {
3351 file_id: file_id.clone(),
3352 });
3353 }
3354 Err(e) => {
3355 let msg = e.to_string();
3356 warn!(%msg, "chunk processing failed");
3357 let _ = repo::update_attachment_status(
3358 &self.db,
3359 room_id,
3360 &file_id,
3361 AttachmentStatus::Failed,
3362 Some(&msg),
3363 );
3364 let _ = self.app_event_tx.send(AppEvent::FileFailed {
3365 file_id: file_id.clone(),
3366 reason: msg,
3367 });
3368 }
3369 }
3370 }
3371
3372 fn maybe_emit_mention(&self, room_id: &str, body: &str) {
3375 let full = self.identity.fingerprint().to_lowercase();
3376 let short: &str = full.split('-').next().unwrap_or(&full);
3378 let lower = body.to_lowercase();
3379 let hit = lower.contains(full.as_str())
3383 || lower
3384 .split(|c: char| !c.is_ascii_hexdigit())
3385 .any(|tok| tok == short);
3386 if hit {
3387 let _ = self.app_event_tx.send(AppEvent::MentionReceived {
3388 room_id: room_id.to_string(),
3389 body: body.to_string(),
3390 });
3391 }
3392 }
3393
3394 fn decrypt_attachment(
3395 &self,
3396 room_id: &str,
3397 sender_fingerprint: &str,
3398 ciphertext: &[u8],
3399 meta: &EncryptedFileMeta,
3400 ) -> Result<Vec<u8>> {
3401 let mut rooms = self.active_rooms.lock().unwrap();
3402 let room = rooms
3403 .get_mut(room_id)
3404 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
3405 let crypto = room
3406 .crypto
3407 .as_mut()
3408 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
3409 file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
3410 }
3411
3412 pub async fn go_dark(&self, master_passphrase: &str) -> Result<()> {
3424 let no_master = self.session_persist_key == [0u8; 32];
3425 if !no_master {
3426 let salt = storage::keychain::load_or_create_salt()?;
3427 let candidate_master =
3428 storage::keychain::derive_master_key(master_passphrase, &salt)?;
3429 let candidate_subkey =
3430 storage::keychain::derive_subkey(&candidate_master, b"megolm-persist");
3431 if !ct_eq_32(&candidate_subkey, &self.session_persist_key) {
3432 return Err(HuddleError::Other(
3433 "incorrect master passphrase".into(),
3434 ));
3435 }
3436 }
3437
3438 let room_ids: Vec<String> = self
3439 .active_rooms
3440 .lock()
3441 .unwrap()
3442 .keys()
3443 .cloned()
3444 .collect();
3445 let _ = tokio::time::timeout(Duration::from_secs(2), async {
3446 for room_id in &room_ids {
3447 if let Err(e) = self.leave_room(room_id).await {
3448 warn!(%room_id, %e, "go_dark: leave_room failed");
3449 }
3450 }
3451 })
3452 .await;
3453
3454 self.network.shutdown().await;
3455 tokio::time::sleep(Duration::from_millis(300)).await;
3456
3457 let data_dir = config::data_dir();
3458 let candidates = [
3459 "huddle.db",
3460 "huddle.db-shm",
3461 "huddle.db-wal",
3462 "keychain.salt",
3463 "huddle.log",
3464 "config.toml",
3465 ];
3466 for name in &candidates {
3467 let path = data_dir.join(name);
3468 wipe_file(&path);
3469 }
3470 if let Ok(read) = std::fs::read_dir(&data_dir) {
3471 for entry in read.flatten() {
3472 if let Some(name) = entry.file_name().to_str() {
3473 if name.starts_with("huddle.log.") {
3474 wipe_file(&entry.path());
3475 }
3476 }
3477 }
3478 }
3479 let _ = std::fs::remove_dir(&data_dir);
3480
3481 let _ = self.app_event_tx.send(AppEvent::WentDark);
3482 Ok(())
3483 }
3484}
3485
3486fn ct_eq_32(a: &[u8; 32], b: &[u8; 32]) -> bool {
3490 let mut diff = 0u8;
3491 for i in 0..32 {
3492 diff |= a[i] ^ b[i];
3493 }
3494 diff == 0
3495}
3496
3497fn wipe_file(path: &Path) {
3501 use std::io::Write;
3502 if let Ok(meta) = std::fs::metadata(path) {
3503 if let Ok(mut f) = std::fs::OpenOptions::new().write(true).open(path) {
3504 let zeros = vec![0u8; meta.len() as usize];
3505 let _ = f.write_all(&zeros);
3506 let _ = f.sync_all();
3507 }
3508 }
3509 if let Err(e) = std::fs::remove_file(path) {
3510 if e.kind() != std::io::ErrorKind::NotFound {
3511 warn!(?path, %e, "wipe_file: remove failed");
3512 }
3513 }
3514}
3515
3516fn open_with_system(path: &str) -> Result<()> {
3518 #[cfg(target_os = "macos")]
3519 let cmd = "open";
3520 #[cfg(target_os = "linux")]
3521 let cmd = "xdg-open";
3522 #[cfg(target_os = "windows")]
3523 let cmd = "cmd";
3524 #[cfg(target_os = "windows")]
3525 let args = vec!["/C", "start", "", path];
3526 #[cfg(not(target_os = "windows"))]
3527 let args = vec![path];
3528
3529 std::process::Command::new(cmd)
3530 .args(args)
3531 .spawn()
3532 .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
3533 Ok(())
3534}
3535
3536static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
3539 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
3540
3541pub fn salt_len() -> usize {
3546 SALT_LEN
3547}
3548
3549fn now_unix() -> i64 {
3550 SystemTime::now()
3551 .duration_since(UNIX_EPOCH)
3552 .unwrap()
3553 .as_secs() as i64
3554}
3555
3556fn now_unix_ms() -> i64 {
3557 SystemTime::now()
3558 .duration_since(UNIX_EPOCH)
3559 .unwrap()
3560 .as_millis() as i64
3561}
3562
3563fn generate_join_passphrase() -> String {
3569 use rand::RngCore;
3570 let mut bytes = [0u8; 16];
3571 rand::thread_rng().fill_bytes(&mut bytes);
3572 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
3575}
3576
3577fn generate_alphanumeric_code(len: usize) -> String {
3582 use rand::Rng;
3583 const ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
3584 let mut rng = rand::thread_rng();
3585 let mut out = String::with_capacity(len + 1);
3586 for i in 0..len {
3587 if i == 4 && len == 8 {
3588 out.push('-'); }
3590 let idx = rng.gen_range(0..ALPHABET.len());
3591 out.push(ALPHABET[idx] as char);
3592 }
3593 out
3594}
3595
3596#[cfg(test)]
3597mod parser_tests {
3598 use super::parse_dial_address;
3599
3600 #[test]
3601 fn parses_ipv4_port() {
3602 let m = parse_dial_address("10.3.72.53:9027").unwrap();
3603 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
3604 }
3605
3606 #[test]
3607 fn parses_bracketed_ipv6() {
3608 let m = parse_dial_address("[::1]:9027").unwrap();
3609 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
3610 }
3611
3612 #[test]
3613 fn rejects_unbracketed_ipv6() {
3614 let err = parse_dial_address("fe80::1:9027").unwrap_err();
3615 assert!(err.to_string().contains("brackets"));
3616 }
3617
3618 #[test]
3619 fn passes_through_raw_multiaddr() {
3620 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
3621 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
3622 }
3623
3624 #[test]
3625 fn empty_address_is_error() {
3626 assert!(parse_dial_address(" ").is_err());
3627 }
3628
3629 #[test]
3630 fn rejects_bad_port() {
3631 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
3632 }
3633}