1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{encode_wire, RoomAnnouncement, RoomMessage, WireMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25 self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26 StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36 pub address: String,
37 pub label: Option<String>,
38 pub last_connected_at: Option<i64>,
39 pub connected_peer_id: Option<PeerId>,
40}
41
42pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45 let trimmed = input.trim();
46 if trimmed.is_empty() {
47 return Err(HuddleError::Other("address is empty".into()));
48 }
49 if trimmed.starts_with('/') {
50 return trimmed
51 .parse::<Multiaddr>()
52 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53 }
54 if let Some(rest) = trimmed.strip_prefix('[') {
55 let (host, port) = rest
56 .split_once("]:")
57 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58 let port: u16 = port
59 .parse()
60 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61 return format!("/ip6/{}/tcp/{}", host, port)
62 .parse::<Multiaddr>()
63 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64 }
65 let (host, port) = trimmed
66 .rsplit_once(':')
67 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68 if host.contains(':') {
69 return Err(HuddleError::Other(format!(
70 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71 )));
72 }
73 let port: u16 = port
74 .parse()
75 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76 format!("/ip4/{}/tcp/{}", host, port)
77 .parse::<Multiaddr>()
78 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81struct ActiveRoom {
83 info: StoredRoom,
84 crypto: Option<RoomCrypto>,
85 passphrase_key: Option<[u8; KEY_LEN]>,
88 members: HashSet<String>,
90 typers: HashMap<String, i64>,
93 #[allow(dead_code)]
100 read_only: bool,
101 issued_codes: Vec<(String, i64)>,
105}
106
107const TYPING_TTL_SECS: i64 = 3;
108
109const DISCOVERED_TTL_SECS: i64 = 45;
112const ANNOUNCE_INTERVAL_SECS: u64 = 15;
113
114struct SasFlow {
118 room_id: String,
119 partner_fingerprint: String,
120 our_secret: x25519_dalek::StaticSecret,
121 sas_code: Option<crate::crypto::sas::SasCode>,
123 our_confirmed: bool,
124 their_confirmed: bool,
125}
126
127#[derive(Clone)]
128pub struct AppHandle {
129 identity: Arc<Identity>,
130 network: NetworkHandle,
131 mode: NetworkMode,
132 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
133 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
134 restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
138 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
141 file_manager: Arc<FileManager>,
143 db: Db,
144 session_persist_key: [u8; 32],
148 sas_flows: Arc<Mutex<HashMap<String, SasFlow>>>,
151 pending_code_secrets: Arc<Mutex<HashMap<String, x25519_dalek::StaticSecret>>>,
155 app_event_tx: broadcast::Sender<AppEvent>,
156}
157
158impl AppHandle {
159 pub async fn start() -> Result<Self> {
160 Self::start_with_options(NetworkMode::Mdns, 0, None, Vec::new()).await
161 }
162
163 pub async fn start_with_options(
164 mode: NetworkMode,
165 port: u16,
166 master_key: Option<&[u8; 32]>,
167 relays: Vec<Multiaddr>,
168 ) -> Result<Self> {
169 config::ensure_data_dir()?;
170 let session_persist_key = match master_key {
175 Some(mk) => storage::keychain::derive_subkey(mk, b"megolm-persist"),
176 None => [0u8; 32],
177 };
178 let db = storage::open_db(&config::db_path(), master_key)?;
179 Self::start_with_db_and_options(db, mode, port, session_persist_key, relays).await
180 }
181
182 pub async fn start_with_db(db: Db) -> Result<Self> {
183 Self::start_with_db_and_options(db, NetworkMode::Mdns, 0, [0u8; 32], Vec::new()).await
184 }
185
186 pub async fn start_with_db_and_options(
187 db: Db,
188 mode: NetworkMode,
189 port: u16,
190 session_persist_key: [u8; 32],
191 relays: Vec<Multiaddr>,
192 ) -> Result<Self> {
193 let identity = Self::load_or_create_identity(&db)?;
194 let identity = Arc::new(identity);
195 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, relay_count = relays.len(), "identity loaded");
196
197 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
198 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
199 let network =
200 network::start_network_with(&identity, net_event_tx, mode, port, relays)?;
201
202 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
203 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
204 let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
205 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
206 let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
207
208 let handle = Self {
209 identity,
210 network,
211 mode,
212 active_rooms,
213 discovered_rooms,
214 restorable_rooms,
215 connected_dial_addrs,
216 file_manager,
217 db,
218 session_persist_key,
219 sas_flows: Arc::new(Mutex::new(HashMap::new())),
220 pending_code_secrets: Arc::new(Mutex::new(HashMap::new())),
221 app_event_tx,
222 };
223
224 handle.spawn_event_processor(net_event_rx);
225 handle.spawn_announcement_ticker();
226 handle.spawn_discovered_room_pruner();
227 handle.spawn_known_peer_reconnector();
228 handle.restore_rooms_from_db().await;
229
230 Ok(handle)
231 }
232
233 pub fn mode(&self) -> NetworkMode {
234 self.mode
235 }
236
237 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
238 self.app_event_tx.subscribe()
239 }
240
241 pub fn fingerprint(&self) -> &str {
242 self.identity.fingerprint()
243 }
244
245 pub fn peer_id(&self) -> PeerId {
246 self.identity.peer_id()
247 }
248
249 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
250 let now = now_unix();
251 let mut by_id: HashMap<String, DiscoveredRoom> = self
252 .discovered_rooms
253 .lock()
254 .unwrap()
255 .clone();
256
257 for room in self.active_rooms.lock().unwrap().values() {
261 let entry = DiscoveredRoom {
262 room_id: room.info.id.clone(),
263 name: room.info.name.clone(),
264 encrypted: room.info.encrypted,
265 member_count: room.members.len() as u32,
266 creator_fingerprint: room.info.creator_fingerprint.clone(),
267 last_seen: now,
268 restorable: false,
269 };
270 by_id
271 .entry(room.info.id.clone())
272 .and_modify(|d| {
273 d.last_seen = now;
274 if entry.member_count > d.member_count {
275 d.member_count = entry.member_count;
276 }
277 d.restorable = false;
278 })
279 .or_insert(entry);
280 }
281
282 for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
286 if by_id.contains_key(id) {
287 continue;
288 }
289 by_id.insert(
290 id.clone(),
291 DiscoveredRoom {
292 room_id: id.clone(),
293 name: stored.name.clone(),
294 encrypted: stored.encrypted,
295 member_count: 0,
296 creator_fingerprint: stored.creator_fingerprint.clone(),
297 last_seen: stored.last_active.unwrap_or(stored.created_at),
298 restorable: true,
299 },
300 );
301 }
302
303 let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
304 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
305 v
306 }
307
308 pub fn active_room_ids(&self) -> Vec<String> {
309 self.active_rooms.lock().unwrap().keys().cloned().collect()
310 }
311
312 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
313 self.active_rooms
314 .lock()
315 .unwrap()
316 .get(room_id)
317 .map(|r| r.info.clone())
318 }
319
320 pub fn room_members(&self, room_id: &str) -> Vec<String> {
321 self.active_rooms
322 .lock()
323 .unwrap()
324 .get(room_id)
325 .map(|r| {
326 let mut m: Vec<String> = r.members.iter().cloned().collect();
327 m.sort();
328 m
329 })
330 .unwrap_or_default()
331 }
332
333 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
334 repo::get_room_messages(&self.db, room_id, limit)
335 }
336
337 pub fn search_room_messages(
338 &self,
339 room_id: &str,
340 query: &str,
341 limit: i64,
342 ) -> Result<Vec<repo::StoredRoomMessage>> {
343 repo::search_room_messages(&self.db, room_id, query, limit)
344 }
345
346 pub async fn start_room(
348 &self,
349 name: &str,
350 encrypted: bool,
351 passphrase: Option<&str>,
352 ) -> Result<String> {
353 if encrypted && passphrase.is_none() {
354 return Err(HuddleError::Other(
355 "encrypted room requires a passphrase".into(),
356 ));
357 }
358
359 let created_at = now_unix();
360 let creator_fp = self.identity.fingerprint().to_string();
361 let room_id = derive_room_id(&creator_fp, name, created_at);
362
363 let (passphrase_salt, passphrase_key) = if encrypted {
364 let salt = passphrase::random_salt();
365 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
366 (Some(salt.to_vec()), Some(key))
367 } else {
368 (None, None)
369 };
370
371 let info = StoredRoom {
372 id: room_id.clone(),
373 name: name.to_string(),
374 creator_fingerprint: creator_fp.clone(),
375 encrypted,
376 passphrase_salt: passphrase_salt.clone(),
377 created_at,
378 last_active: Some(created_at),
379 };
380 repo::insert_room(&self.db, &info)?;
381
382 let crypto = if encrypted {
383 Some(RoomCrypto::new_for_room(
384 self.db.clone(),
385 room_id.clone(),
386 creator_fp.clone(),
387 self.session_persist_key,
388 )?)
389 } else {
390 None
391 };
392
393 let mut members = HashSet::new();
394 members.insert(creator_fp.clone());
395
396 repo::upsert_room_member(
400 &self.db,
401 &StoredRoomMember {
402 room_id: room_id.clone(),
403 peer_id: String::new(),
404 fingerprint: creator_fp.clone(),
405 last_seen: Some(created_at),
406 verified: true, ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
408 role: "owner".into(),
409 },
410 )?;
411
412 self.active_rooms.lock().unwrap().insert(
413 room_id.clone(),
414 ActiveRoom {
415 info: info.clone(),
416 crypto,
417 passphrase_key,
418 members,
419 typers: HashMap::new(),
420 read_only: false,
421 issued_codes: Vec::new(),
422 },
423 );
424
425 self.network.subscribe_room(room_id.clone()).await;
426 self.announce_room_now(&info, 1).await;
427
428 let app = self.clone();
431 let rid = room_id.clone();
432 tokio::spawn(async move {
433 tokio::time::sleep(Duration::from_millis(500)).await;
434 if let Err(e) = app.broadcast_member_announce(&rid).await {
435 warn!(%e, "broadcast member announce");
436 }
437 });
438
439 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
440 room_id: room_id.clone(),
441 });
442
443 Ok(room_id)
444 }
445
446 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
450 let (name, creator_fingerprint, encrypted, salt_opt) = {
452 if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
453 let salt = self.get_room_salt(room_id);
454 (d.name, d.creator_fingerprint, d.encrypted, salt)
455 } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
456 {
457 (
458 stored.name,
459 stored.creator_fingerprint,
460 stored.encrypted,
461 stored.passphrase_salt,
462 )
463 } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
464 (
465 stored.name,
466 stored.creator_fingerprint,
467 stored.encrypted,
468 stored.passphrase_salt,
469 )
470 } else {
471 return Err(HuddleError::Other(format!("room {room_id} not found")));
472 }
473 };
474
475 if encrypted && passphrase.is_none() {
476 return Err(HuddleError::Other(
477 "encrypted room requires a passphrase".into(),
478 ));
479 }
480
481 let passphrase_key = if encrypted {
482 let salt = salt_opt
483 .clone()
484 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
485 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
486 } else {
487 None
488 };
489
490 let info = StoredRoom {
491 id: room_id.to_string(),
492 name,
493 creator_fingerprint,
494 encrypted,
495 passphrase_salt: salt_opt.clone(),
496 created_at: now_unix(),
497 last_active: Some(now_unix()),
498 };
499 repo::insert_room(&self.db, &info)?;
500
501 let crypto = if encrypted {
502 let our_fp = self.identity.fingerprint().to_string();
505 let existing = RoomCrypto::load(
506 self.db.clone(),
507 room_id.to_string(),
508 our_fp.clone(),
509 self.session_persist_key,
510 )?;
511 Some(match existing {
512 Some(c) => c,
513 None => RoomCrypto::new_for_room(
514 self.db.clone(),
515 room_id.to_string(),
516 our_fp,
517 self.session_persist_key,
518 )?,
519 })
520 } else {
521 None
522 };
523
524 let mut members = HashSet::new();
525 members.insert(self.identity.fingerprint().to_string());
526
527 self.active_rooms.lock().unwrap().insert(
528 room_id.to_string(),
529 ActiveRoom {
530 info: info.clone(),
531 crypto,
532 passphrase_key,
533 members,
534 typers: HashMap::new(),
535 read_only: false,
536 issued_codes: Vec::new(),
537 },
538 );
539 self.restorable_rooms.lock().unwrap().remove(room_id);
541
542 self.network.subscribe_room(room_id.to_string()).await;
543
544 let app = self.clone();
545 let rid = room_id.to_string();
546 tokio::spawn(async move {
547 tokio::time::sleep(Duration::from_millis(500)).await;
548 if let Err(e) = app.broadcast_member_announce(&rid).await {
549 warn!(%e, "broadcast member announce");
550 }
551 let req = RoomMessage::SessionKeyRequest {
553 requester_fingerprint: app.identity.fingerprint().to_string(),
554 };
555 if let Ok(bytes) = encode_wire(&req) {
556 app.network.publish_room_message(rid.clone(), bytes).await;
557 }
558 });
559
560 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
561 room_id: room_id.to_string(),
562 });
563
564 Ok(())
565 }
566
567 async fn restore_rooms_from_db(&self) {
572 let rooms = match repo::list_rooms(&self.db) {
573 Ok(v) => v,
574 Err(e) => {
575 warn!(%e, "list rooms on restore");
576 return;
577 }
578 };
579 let our_fp = self.identity.fingerprint().to_string();
580 let count = rooms.len();
581 for info in rooms {
582 if info.encrypted {
583 self.restorable_rooms
584 .lock()
585 .unwrap()
586 .insert(info.id.clone(), info);
587 continue;
588 }
589 let mut members = HashSet::new();
590 members.insert(our_fp.clone());
591 if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
592 for m in stored_members {
593 members.insert(m.fingerprint);
594 }
595 }
596 let member_count = members.len() as u32;
597 self.active_rooms.lock().unwrap().insert(
598 info.id.clone(),
599 ActiveRoom {
600 info: info.clone(),
601 crypto: None,
602 passphrase_key: None,
603 members,
604 typers: HashMap::new(),
605 read_only: false,
606 issued_codes: Vec::new(),
607 },
608 );
609 self.network.subscribe_room(info.id.clone()).await;
610 self.announce_room_now(&info, member_count).await;
611 info!(room_id = %info.id, name = %info.name, "restored room");
612 }
613 if count > 0 {
614 debug!(count, "restored rooms from db");
615 }
616 }
617
618 pub async fn leave_room(&self, room_id: &str) -> Result<bool> {
623 let leave_msg = RoomMessage::MemberLeave {
625 sender_fingerprint: self.identity.fingerprint().to_string(),
626 };
627 let dispatched = match encode_wire(&leave_msg) {
628 Ok(bytes) => {
629 self.network
630 .publish_room_message(room_id.to_string(), bytes)
631 .await;
632 true
633 }
634 Err(e) => {
635 warn!(%e, %room_id, "failed to encode MemberLeave notice");
636 false
637 }
638 };
639
640 self.active_rooms.lock().unwrap().remove(room_id);
641 self.network.unsubscribe_room(room_id.to_string()).await;
642
643 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
644 room_id: room_id.to_string(),
645 });
646 Ok(dispatched)
647 }
648
649 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
650 let our_fp = self.identity.fingerprint().to_string();
651 let msg = {
652 let mut rooms = self.active_rooms.lock().unwrap();
653 let room = rooms
654 .get_mut(room_id)
655 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
656
657 if room.read_only {
658 return Err(HuddleError::Other(
659 "this room is read-only — you joined via code without the passphrase. Ask an owner for the passphrase or wait for a key rotation that includes you.".into(),
660 ));
661 }
662
663 if room.info.encrypted {
664 let crypto = room
665 .crypto
666 .as_mut()
667 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
668 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
669 RoomMessage::Encrypted {
670 sender_fingerprint: our_fp.clone(),
671 session_id,
672 ciphertext_b64: base64::Engine::encode(
673 &base64::engine::general_purpose::STANDARD,
674 &ct_bytes,
675 ),
676 }
677 } else {
678 RoomMessage::Plain {
679 sender_fingerprint: our_fp.clone(),
680 body: body.to_string(),
681 }
682 }
683 };
684
685 let bytes = encode_wire(&msg)?;
686 self.network
687 .publish_room_message(room_id.to_string(), bytes)
688 .await;
689
690 let now = now_unix();
691 let msg_id =
692 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
693 repo::update_room_last_active(&self.db, room_id, now)?;
694
695 let _ = self.app_event_tx.send(AppEvent::MessageSent {
696 room_id: room_id.to_string(),
697 body: body.to_string(),
698 message_id: msg_id,
699 });
700
701 Ok(())
702 }
703
704 pub async fn shutdown(&self) {
705 self.network.shutdown().await;
706 }
707
708 pub async fn dial(&self, input: &str) -> Result<()> {
717 let multiaddr = parse_dial_address(input)?;
718 let canonical = multiaddr.to_string();
719 info!(%canonical, "dialing");
720
721 repo::upsert_known_peer(
722 &self.db,
723 &KnownPeer {
724 address: canonical.clone(),
725 label: None,
726 last_connected_at: None,
727 last_attempt_at: Some(now_unix()),
728 created_at: now_unix(),
729 fingerprint: None,
733 trusted: false,
734 },
735 )?;
736
737 let _ = self.app_event_tx.send(AppEvent::Dialing {
738 address: canonical.clone(),
739 });
740 self.network.dial(multiaddr).await;
741 Ok(())
742 }
743
744 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
745 let connected = self.connected_dial_addrs.lock().unwrap().clone();
746 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
747 stored
748 .into_iter()
749 .map(|p| {
750 let connected_peer = connected.get(&p.address).copied();
751 KnownPeerStatus {
752 address: p.address,
753 label: p.label,
754 last_connected_at: p.last_connected_at,
755 connected_peer_id: connected_peer,
756 }
757 })
758 .collect()
759 }
760
761 pub async fn forget_peer(&self, address: &str) -> Result<()> {
762 repo::forget_known_peer(&self.db, address)?;
763 self.connected_dial_addrs.lock().unwrap().remove(address);
764 Ok(())
765 }
766
767 pub async fn redial(&self, address: &str) -> Result<()> {
769 self.dial(address).await
770 }
771
772 pub async fn accept_inbound(&self, peer_id: PeerId, address: &str) {
777 self.network.accept_inbound(peer_id).await;
778 self.connected_dial_addrs
779 .lock()
780 .unwrap()
781 .insert(address.to_string(), peer_id);
782 }
783
784 pub async fn reject_inbound(&self, peer_id: PeerId, fingerprint: &str) -> Result<()> {
789 self.network.reject_inbound(peer_id).await;
790 repo::block_peer(&self.db, fingerprint, now_unix())?;
791 Ok(())
792 }
793
794 pub async fn trust_inbound(
797 &self,
798 peer_id: PeerId,
799 fingerprint: &str,
800 address: &str,
801 ) -> Result<()> {
802 self.network.accept_inbound(peer_id).await;
803 self.connected_dial_addrs
804 .lock()
805 .unwrap()
806 .insert(address.to_string(), peer_id);
807 repo::upsert_known_peer(
811 &self.db,
812 &KnownPeer {
813 address: address.to_string(),
814 label: None,
815 last_connected_at: Some(now_unix()),
816 last_attempt_at: Some(now_unix()),
817 created_at: now_unix(),
818 fingerprint: Some(fingerprint.to_string()),
819 trusted: true,
820 },
821 )?;
822 Ok(())
823 }
824
825 fn spawn_known_peer_reconnector(&self) {
826 let handle = self.clone();
827 tokio::spawn(async move {
828 tokio::time::sleep(Duration::from_millis(500)).await;
830 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
831 for (i, peer) in known.into_iter().enumerate() {
835 let handle = handle.clone();
836 tokio::spawn(async move {
837 let jitter = (peer.address.len() as u64 * 37) % 200;
840 tokio::time::sleep(Duration::from_millis(150 * i as u64 + jitter)).await;
841 if let Err(e) = handle.dial(&peer.address).await {
842 debug!(%e, addr = %peer.address, "auto-reconnect failed");
843 }
844 });
845 }
846 });
847 }
848
849 fn load_or_create_identity(db: &Db) -> Result<Identity> {
854 if let Some(stored) = repo::load_identity(db)? {
855 let mut bytes = [0u8; 32];
856 bytes.copy_from_slice(&stored.ed25519_secret);
857 Identity::from_secret_bytes(bytes)
858 } else {
859 let id = Identity::generate()?;
860 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
861 Ok(id)
862 }
863 }
864
865 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
866 self.active_rooms
867 .lock()
868 .unwrap()
869 .get(room_id)
870 .and_then(|r| r.info.passphrase_salt.clone())
871 .or_else(|| {
872 ROOM_SALT_CACHE
874 .lock()
875 .unwrap()
876 .get(room_id)
877 .cloned()
878 })
879 }
880
881 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
882 let owner_fingerprints =
883 repo::list_room_owners(&self.db, &info.id).unwrap_or_default();
884 let verified_only = repo::get_room_verified_only(&self.db, &info.id).unwrap_or(false);
885 let ann = RoomAnnouncement {
886 room_id: info.id.clone(),
887 name: info.name.clone(),
888 encrypted: info.encrypted,
889 passphrase_salt: info.passphrase_salt.clone(),
890 member_count,
891 creator_fingerprint: info.creator_fingerprint.clone(),
892 announced_at: now_unix(),
893 owner_fingerprints,
894 verified_only,
895 };
896 self.network.announce_room(ann).await;
897 }
898
899 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
900 let our_fp = self.identity.fingerprint().to_string();
901 let wrapped = {
902 let mut rooms = self.active_rooms.lock().unwrap();
903 let room = rooms
904 .get_mut(room_id)
905 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
906 if room.info.encrypted {
907 let crypto = room.crypto.as_mut().unwrap();
908 let session_key = crypto.our_session_key_b64();
909 let passphrase_key = room
910 .passphrase_key
911 .as_ref()
912 .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
913 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
914 } else {
915 None
916 }
917 };
918 let display_name = repo::get_display_name(&self.db).unwrap_or(None);
919 let msg = RoomMessage::MemberAnnounce {
920 sender_fingerprint: our_fp,
921 wrapped_session_key: wrapped,
922 display_name,
923 sender_ed25519_pubkey: Some(B64.encode(self.identity.public_bytes())),
924 };
925 let bytes = encode_wire(&msg)?;
926 self.network
927 .publish_room_message(room_id.to_string(), bytes)
928 .await;
929 Ok(())
930 }
931
932 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
933 let handle = self.clone();
934 tokio::spawn(async move {
935 while let Some(event) = net_rx.recv().await {
936 handle.process_network_event(event).await;
937 }
938 info!("event processor stopped");
939 });
940 }
941
942 fn spawn_announcement_ticker(&self) {
943 let handle = self.clone();
944 tokio::spawn(async move {
945 let mut interval =
946 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
947 interval.tick().await; loop {
949 interval.tick().await;
950 let snapshot: Vec<(StoredRoom, u32)> = {
951 let active = handle.active_rooms.lock().unwrap();
952 active
953 .values()
954 .map(|r| (r.info.clone(), r.members.len() as u32))
955 .collect()
956 };
957 for (info, member_count) in snapshot {
958 handle.announce_room_now(&info, member_count).await;
959 }
960 }
961 });
962 }
963
964 fn spawn_discovered_room_pruner(&self) {
965 let handle = self.clone();
966 tokio::spawn(async move {
967 let mut interval = tokio::time::interval(Duration::from_secs(10));
968 interval.tick().await;
969 loop {
970 interval.tick().await;
971 let now = now_unix();
972 let mut to_drop = Vec::new();
973 {
974 let mut map = handle.discovered_rooms.lock().unwrap();
975 map.retain(|id, r| {
976 if now - r.last_seen > DISCOVERED_TTL_SECS {
977 to_drop.push(id.clone());
978 false
979 } else {
980 true
981 }
982 });
983 }
984 for id in to_drop {
985 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
986 }
987 }
988 });
989 }
990
991 async fn process_network_event(&self, event: NetworkEvent) {
992 match event {
993 NetworkEvent::PeerDiscovered { peer_id } => {
994 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
995 }
996 NetworkEvent::PeerExpired { peer_id } => {
997 self.connected_dial_addrs
1003 .lock()
1004 .unwrap()
1005 .retain(|_addr, pid| *pid != peer_id);
1006 let _ = self.app_event_tx.send(AppEvent::PeerExpired { peer_id });
1007 }
1008 NetworkEvent::ListeningOn { address } => {
1009 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1010 address: address.to_string(),
1011 });
1012 }
1013 NetworkEvent::RoomAnnouncementReceived(ann) => {
1014 if let Some(salt) = &ann.passphrase_salt {
1016 ROOM_SALT_CACHE
1017 .lock()
1018 .unwrap()
1019 .insert(ann.room_id.clone(), salt.clone());
1020 }
1021 let discovered = DiscoveredRoom {
1022 room_id: ann.room_id.clone(),
1023 name: ann.name.clone(),
1024 encrypted: ann.encrypted,
1025 member_count: ann.member_count,
1026 creator_fingerprint: ann.creator_fingerprint.clone(),
1027 last_seen: now_unix(),
1028 restorable: false,
1029 };
1030 if self.active_rooms.lock().unwrap().contains_key(&ann.room_id) {
1035 self.discovered_rooms
1036 .lock()
1037 .unwrap()
1038 .insert(ann.room_id.clone(), discovered);
1039 return;
1040 }
1041 self.discovered_rooms
1042 .lock()
1043 .unwrap()
1044 .insert(ann.room_id.clone(), discovered.clone());
1045 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
1046 }
1047 NetworkEvent::RoomMessageReceived {
1048 room_id,
1049 payload,
1050 from_peer: _,
1051 } => {
1052 let wire: WireMessage = match serde_json::from_slice(&payload) {
1059 Ok(w) => w,
1060 Err(e) => {
1061 warn!(%e, "bad wire envelope");
1062 return;
1063 }
1064 };
1065 let (msg, verified_signer) = match wire {
1066 WireMessage::Plain(m) => (m, None),
1067 WireMessage::Signed(env) => {
1068 match crate::crypto::verify_signed(&env) {
1069 Ok((m, fp)) => (m, Some(fp)),
1070 Err(e) => {
1071 warn!(%e, fp = %env.fingerprint, "signed envelope verify failed");
1072 return;
1073 }
1074 }
1075 }
1076 };
1077 self.handle_room_message(&room_id, msg, verified_signer).await;
1078 }
1079 NetworkEvent::DialSucceeded { peer_id, address } => {
1080 let addr_s = address.to_string();
1081 self.connected_dial_addrs
1082 .lock()
1083 .unwrap()
1084 .insert(addr_s.clone(), peer_id);
1085 let _ = repo::upsert_known_peer(
1089 &self.db,
1090 &KnownPeer {
1091 address: addr_s.clone(),
1092 label: None,
1093 last_connected_at: Some(now_unix()),
1094 last_attempt_at: Some(now_unix()),
1095 created_at: now_unix(),
1096 fingerprint: None,
1097 trusted: false,
1098 },
1099 );
1100 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
1101 address: addr_s,
1102 peer_id,
1103 });
1104 }
1105 NetworkEvent::DialFailed { address, error } => {
1106 let addr_s = address.to_string();
1107 let _ = self.app_event_tx.send(AppEvent::DialFailed {
1108 address: addr_s,
1109 error,
1110 });
1111 }
1112 NetworkEvent::PeerIdentified { peer_id, fingerprint } => {
1113 let matched_addrs: Vec<String> = {
1119 let map = self.connected_dial_addrs.lock().unwrap();
1120 map.iter()
1121 .filter_map(|(addr, pid)| {
1122 if *pid == peer_id {
1123 Some(addr.clone())
1124 } else {
1125 None
1126 }
1127 })
1128 .collect()
1129 };
1130 for addr in matched_addrs {
1131 let _ = repo::upsert_known_peer(
1132 &self.db,
1133 &KnownPeer {
1134 address: addr,
1135 label: None,
1136 last_connected_at: Some(now_unix()),
1137 last_attempt_at: Some(now_unix()),
1138 created_at: now_unix(),
1139 fingerprint: Some(fingerprint.clone()),
1140 trusted: true,
1141 },
1142 );
1143 }
1144 }
1145 NetworkEvent::RelayReservationEstablished { address } => {
1146 info!(addr = %address, "relay reservation established");
1151 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
1152 address: address.to_string(),
1153 });
1154 }
1155 NetworkEvent::InboundDial {
1156 peer_id,
1157 fingerprint,
1158 address,
1159 } => {
1160 if repo::is_peer_blocked(&self.db, &fingerprint).unwrap_or(false) {
1162 info!(%fingerprint, "inbound dial auto-rejected: peer is blocked");
1163 self.network.reject_inbound(peer_id).await;
1164 return;
1165 }
1166 let global_verified_only =
1171 repo::get_setting(&self.db, "verified_only_inbound")
1172 .ok()
1173 .flatten()
1174 .map(|v| v == "1")
1175 .unwrap_or(false);
1176 if global_verified_only {
1177 let is_verified =
1178 repo::is_globally_verified(&self.db, &fingerprint).unwrap_or(false)
1179 || repo::is_fingerprint_trusted(&self.db, &fingerprint)
1180 .unwrap_or(false);
1181 if !is_verified {
1182 info!(
1183 %fingerprint,
1184 "inbound dial auto-rejected: verified-only mode"
1185 );
1186 self.network.reject_inbound(peer_id).await;
1187 return;
1188 }
1189 }
1190 if repo::is_fingerprint_trusted(&self.db, &fingerprint).unwrap_or(false) {
1191 info!(%fingerprint, "inbound dial auto-accepted: peer is trusted");
1192 self.connected_dial_addrs
1195 .lock()
1196 .unwrap()
1197 .insert(address.to_string(), peer_id);
1198 let _ = repo::upsert_known_peer(
1199 &self.db,
1200 &KnownPeer {
1201 address: address.to_string(),
1202 label: None,
1203 last_connected_at: Some(now_unix()),
1204 last_attempt_at: Some(now_unix()),
1205 created_at: now_unix(),
1206 fingerprint: Some(fingerprint),
1207 trusted: true,
1208 },
1209 );
1210 self.network.accept_inbound(peer_id).await;
1211 return;
1212 }
1213 let _ = self.app_event_tx.send(AppEvent::InboundDial {
1215 peer_id,
1216 fingerprint,
1217 address: address.to_string(),
1218 });
1219 }
1220 }
1221 }
1222
1223 async fn handle_room_message(
1229 &self,
1230 room_id: &str,
1231 msg: RoomMessage,
1232 verified_signer: Option<String>,
1233 ) {
1234 let our_fp = self.identity.fingerprint().to_string();
1235 match msg {
1236 RoomMessage::MemberAnnounce {
1237 sender_fingerprint,
1238 wrapped_session_key,
1239 display_name,
1240 sender_ed25519_pubkey,
1241 } => {
1242 if sender_fingerprint == our_fp {
1243 return;
1244 }
1245 if repo::is_member_banned(&self.db, room_id, &sender_fingerprint)
1248 .unwrap_or(false)
1249 {
1250 info!(%sender_fingerprint, %room_id, "dropping MemberAnnounce from banned peer");
1251 return;
1252 }
1253 if repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
1260 && !repo::is_globally_verified(&self.db, &sender_fingerprint).unwrap_or(false)
1261 {
1262 info!(
1263 %sender_fingerprint, %room_id,
1264 "dropping MemberAnnounce: room is verified-only and joiner isn't verified"
1265 );
1266 let owners = repo::list_room_owners(&self.db, room_id).unwrap_or_default();
1267 let lowest_owner = owners.iter().min().cloned();
1268 if lowest_owner.as_deref() == Some(&our_fp) {
1269 let msg = RoomMessage::JoinRefused {
1270 room_id: room_id.to_string(),
1271 target_fingerprint: sender_fingerprint.clone(),
1272 reason: "room requires SAS verification — ask an existing member to verify you".into(),
1273 };
1274 if let Ok(env) = crate::crypto::sign_message(&self.identity, &msg) {
1275 if let Ok(bytes) =
1276 crate::network::protocol::encode_wire_signed(&env)
1277 {
1278 self.network
1279 .publish_room_message(room_id.to_string(), bytes)
1280 .await;
1281 }
1282 }
1283 }
1284 return;
1285 }
1286 let need_inbound = {
1287 let mut rooms = self.active_rooms.lock().unwrap();
1288 let room = match rooms.get_mut(room_id) {
1289 Some(r) => r,
1290 None => return,
1291 };
1292 let newly_added = room.members.insert(sender_fingerprint.clone());
1293 if newly_added {
1294 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1295 room_id: room_id.to_string(),
1296 fingerprint: sender_fingerprint.clone(),
1297 });
1298 }
1299 let _ = repo::upsert_room_member(
1304 &self.db,
1305 &StoredRoomMember {
1306 room_id: room_id.to_string(),
1307 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
1309 last_seen: Some(now_unix()),
1310 verified: false,
1311 ed25519_pubkey: sender_ed25519_pubkey.clone(),
1312 role: "member".into(),
1318 },
1319 );
1320 if let Some(name) = display_name.as_deref() {
1321 let _ = repo::set_member_display_name(
1322 &self.db,
1323 room_id,
1324 &sender_fingerprint,
1325 Some(name),
1326 );
1327 }
1328 room.info.encrypted && wrapped_session_key.is_some()
1329 };
1330
1331 if need_inbound {
1332 let wrapped = wrapped_session_key.unwrap();
1333 let result = {
1334 let mut rooms = self.active_rooms.lock().unwrap();
1335 let room = rooms.get_mut(room_id).unwrap();
1336 let passphrase_key = match &room.passphrase_key {
1337 Some(k) => k,
1338 None => {
1339 warn!("no passphrase key when receiving session key");
1340 return;
1341 }
1342 };
1343 match passphrase::unwrap(&wrapped, passphrase_key) {
1344 Ok(plain) => match String::from_utf8(plain) {
1345 Ok(key_b64) => {
1346 let crypto = room.crypto.as_mut().unwrap();
1347 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
1348 }
1349 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
1350 },
1351 Err(e) => Err(e),
1352 }
1353 };
1354 if let Err(e) = result {
1355 error!(%e, "add inbound session failed");
1356 }
1357 }
1358 }
1359 RoomMessage::SessionKeyRequest {
1360 requester_fingerprint,
1361 } => {
1362 if requester_fingerprint == our_fp {
1363 return;
1364 }
1365 if let Err(e) = self.broadcast_member_announce(room_id).await {
1367 warn!(%e, "broadcast member announce on request");
1368 }
1369 }
1370 RoomMessage::Encrypted {
1371 sender_fingerprint,
1372 session_id,
1373 ciphertext_b64,
1374 } => {
1375 if sender_fingerprint == our_fp {
1376 return;
1377 }
1378 let ct_bytes = match base64::Engine::decode(
1379 &base64::engine::general_purpose::STANDARD,
1380 &ciphertext_b64,
1381 ) {
1382 Ok(b) => b,
1383 Err(e) => {
1384 warn!(%e, "bad base64 ciphertext");
1385 return;
1386 }
1387 };
1388 let plaintext = {
1389 let mut rooms = self.active_rooms.lock().unwrap();
1390 let room = match rooms.get_mut(room_id) {
1391 Some(r) => r,
1392 None => return,
1393 };
1394 let crypto = match room.crypto.as_mut() {
1395 Some(c) => c,
1396 None => return,
1397 };
1398 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1399 };
1400 match plaintext {
1401 Ok(pt) => {
1402 let body = String::from_utf8_lossy(&pt).to_string();
1403 let sent_at = now_unix();
1404 let _ = repo::insert_room_message(
1405 &self.db,
1406 room_id,
1407 &sender_fingerprint,
1408 "in",
1409 &body,
1410 sent_at,
1411 );
1412 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1413 self.maybe_emit_mention(room_id, &body);
1414 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1415 room_id: room_id.to_string(),
1416 sender_fingerprint,
1417 body,
1418 sent_at,
1419 });
1420 }
1421 Err(e) => {
1422 debug!(%e, "decrypt failed (probably missing session key)");
1423 }
1424 }
1425 }
1426 RoomMessage::Plain {
1427 sender_fingerprint,
1428 body,
1429 } => {
1430 if sender_fingerprint == our_fp {
1431 return;
1432 }
1433 let sent_at = now_unix();
1434 let _ = repo::insert_room_message(
1435 &self.db,
1436 room_id,
1437 &sender_fingerprint,
1438 "in",
1439 &body,
1440 sent_at,
1441 );
1442 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1443 self.maybe_emit_mention(room_id, &body);
1444 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1445 room_id: room_id.to_string(),
1446 sender_fingerprint,
1447 body,
1448 sent_at,
1449 });
1450 }
1451 RoomMessage::Typing { sender_fingerprint } => {
1452 if sender_fingerprint == our_fp {
1453 return;
1454 }
1455 let expiry = now_unix() + TYPING_TTL_SECS;
1456 let mut rooms = self.active_rooms.lock().unwrap();
1457 if let Some(room) = rooms.get_mut(room_id) {
1458 room.typers.insert(sender_fingerprint, expiry);
1459 }
1460 drop(rooms);
1461 let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1462 room_id: room_id.to_string(),
1463 });
1464 }
1465 RoomMessage::RotateRoomKey {
1466 rotator_fingerprint,
1467 new_salt,
1468 } => {
1469 if rotator_fingerprint == our_fp {
1470 return;
1471 }
1472 let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1473 room_id: room_id.to_string(),
1474 rotator_fingerprint,
1475 new_salt,
1476 });
1477 }
1478 RoomMessage::MemberLeave { sender_fingerprint } => {
1479 if sender_fingerprint == our_fp {
1480 return;
1481 }
1482 let removed = {
1483 let mut rooms = self.active_rooms.lock().unwrap();
1484 if let Some(room) = rooms.get_mut(room_id) {
1485 room.members.remove(&sender_fingerprint)
1486 } else {
1487 false
1488 }
1489 };
1490 if removed {
1491 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1492 room_id: room_id.to_string(),
1493 fingerprint: sender_fingerprint,
1494 });
1495 }
1496 }
1497 RoomMessage::FileOffer {
1498 sender_fingerprint,
1499 file_id,
1500 name,
1501 size_bytes,
1502 mime,
1503 chunk_count,
1504 encrypted_meta,
1505 } => {
1506 if sender_fingerprint == our_fp {
1507 return; }
1509 self.handle_file_offer(
1510 room_id,
1511 sender_fingerprint,
1512 file_id,
1513 name,
1514 size_bytes,
1515 mime,
1516 chunk_count,
1517 encrypted_meta,
1518 );
1519 }
1520 RoomMessage::FileChunk {
1521 sender_fingerprint,
1522 file_id,
1523 chunk_index,
1524 total_chunks,
1525 data_b64,
1526 } => {
1527 if sender_fingerprint == our_fp {
1528 return;
1529 }
1530 self.handle_file_chunk(
1531 room_id,
1532 sender_fingerprint,
1533 file_id,
1534 chunk_index,
1535 total_chunks,
1536 data_b64,
1537 );
1538 }
1539 RoomMessage::OwnerGrant {
1540 room_id: announced_room_id,
1541 target_fingerprint,
1542 } => {
1543 if announced_room_id != room_id {
1548 warn!(payload_room = %announced_room_id, topic_room = %room_id, "OwnerGrant room mismatch");
1549 return;
1550 }
1551 let signer = match verified_signer {
1552 Some(fp) => fp,
1553 None => {
1554 warn!(%room_id, "OwnerGrant arrived unsigned; dropping");
1555 return;
1556 }
1557 };
1558 if !self.is_owner(room_id, &signer) {
1559 warn!(%signer, %room_id, "OwnerGrant signer isn't an owner; dropping");
1560 return;
1561 }
1562 info!(%signer, %target_fingerprint, %room_id, "OwnerGrant applied");
1563 if let Err(e) =
1564 repo::set_member_role(&self.db, room_id, &target_fingerprint, "owner")
1565 {
1566 warn!(%e, "OwnerGrant: set_member_role failed");
1567 }
1568 }
1569 RoomMessage::BanMember {
1570 room_id: announced_room_id,
1571 target_fingerprint,
1572 } => {
1573 if announced_room_id != room_id {
1574 warn!(payload_room = %announced_room_id, topic_room = %room_id, "BanMember room mismatch");
1575 return;
1576 }
1577 let signer = match verified_signer {
1578 Some(fp) => fp,
1579 None => {
1580 warn!(%room_id, "BanMember arrived unsigned; dropping");
1581 return;
1582 }
1583 };
1584 if !self.is_owner(room_id, &signer) {
1585 warn!(%signer, %room_id, "BanMember signer isn't an owner; dropping");
1586 return;
1587 }
1588 if target_fingerprint == our_fp {
1589 info!(%room_id, %signer, "we were kicked from this room");
1595 self.active_rooms.lock().unwrap().remove(room_id);
1596 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
1597 room_id: room_id.to_string(),
1598 });
1599 return;
1600 }
1601 info!(%signer, %target_fingerprint, %room_id, "BanMember applied");
1602 if let Err(e) = repo::add_room_ban(
1603 &self.db,
1604 room_id,
1605 &target_fingerprint,
1606 &signer,
1607 "", now_unix(),
1609 ) {
1610 warn!(%e, "BanMember: add_room_ban failed");
1611 }
1612 self.evict_banned_member(room_id, &target_fingerprint);
1613 }
1614 RoomMessage::SasInit {
1615 tx_id,
1616 ephemeral_x25519_pubkey_b64,
1617 target_fingerprint,
1618 } => {
1619 if target_fingerprint != our_fp {
1620 return;
1625 }
1626 let signer = match verified_signer {
1627 Some(fp) => fp,
1628 None => {
1629 warn!("SasInit arrived unsigned; dropping");
1630 return;
1631 }
1632 };
1633 let their_pub =
1634 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
1635 Ok(pk) => pk,
1636 Err(e) => {
1637 warn!(%e, "SasInit: bad x25519 pubkey");
1638 return;
1639 }
1640 };
1641 let tx_id_bytes = match B64.decode(&tx_id) {
1642 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
1643 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
1644 arr.copy_from_slice(&b);
1645 arr
1646 }
1647 _ => {
1648 warn!(%tx_id, "SasInit: bad tx_id length");
1649 return;
1650 }
1651 };
1652 let (_, our_secret, our_pub) = crate::crypto::sas::new_session();
1653 let sas_code =
1654 crate::crypto::sas::derive_sas_code(&our_secret, &their_pub, &tx_id_bytes);
1655 self.sas_flows.lock().unwrap().insert(
1656 tx_id.clone(),
1657 SasFlow {
1658 room_id: room_id.to_string(),
1659 partner_fingerprint: signer.clone(),
1660 our_secret,
1661 sas_code: Some(sas_code.clone()),
1662 our_confirmed: false,
1663 their_confirmed: false,
1664 },
1665 );
1666 let response = RoomMessage::SasResponse {
1669 tx_id: tx_id.clone(),
1670 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
1671 };
1672 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
1673 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
1674 self.network
1675 .publish_room_message(room_id.to_string(), bytes)
1676 .await;
1677 }
1678 }
1679 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
1680 room_id: room_id.to_string(),
1681 partner_fingerprint: signer,
1682 tx_id,
1683 emoji_string: sas_code.emoji_string(),
1684 emoji_labels: sas_code.emoji_labels(),
1685 decimal: sas_code.decimal,
1686 });
1687 }
1688 RoomMessage::SasResponse {
1689 tx_id,
1690 ephemeral_x25519_pubkey_b64,
1691 } => {
1692 let signer = match verified_signer {
1693 Some(fp) => fp,
1694 None => {
1695 warn!("SasResponse arrived unsigned; dropping");
1696 return;
1697 }
1698 };
1699 let their_pub =
1700 match crate::crypto::sas::parse_pubkey(&ephemeral_x25519_pubkey_b64) {
1701 Ok(pk) => pk,
1702 Err(e) => {
1703 warn!(%e, "SasResponse: bad x25519 pubkey");
1704 return;
1705 }
1706 };
1707 let tx_id_bytes = match B64.decode(&tx_id) {
1708 Ok(b) if b.len() == crate::crypto::sas::TX_ID_LEN => {
1709 let mut arr = [0u8; crate::crypto::sas::TX_ID_LEN];
1710 arr.copy_from_slice(&b);
1711 arr
1712 }
1713 _ => return,
1714 };
1715 let emit = {
1716 let mut flows = self.sas_flows.lock().unwrap();
1717 let flow = match flows.get_mut(&tx_id) {
1718 Some(f) => f,
1719 None => {
1720 warn!(%tx_id, "SasResponse for unknown tx_id");
1721 return;
1722 }
1723 };
1724 if flow.partner_fingerprint != signer {
1725 warn!(
1726 expected = %flow.partner_fingerprint, got = %signer,
1727 "SasResponse signer doesn't match flow's partner; dropping"
1728 );
1729 return;
1730 }
1731 let code = crate::crypto::sas::derive_sas_code(
1732 &flow.our_secret,
1733 &their_pub,
1734 &tx_id_bytes,
1735 );
1736 flow.sas_code = Some(code.clone());
1737 code
1738 };
1739 let _ = self.app_event_tx.send(AppEvent::SasCodeReady {
1740 room_id: room_id.to_string(),
1741 partner_fingerprint: signer,
1742 tx_id,
1743 emoji_string: emit.emoji_string(),
1744 emoji_labels: emit.emoji_labels(),
1745 decimal: emit.decimal,
1746 });
1747 }
1748 RoomMessage::CodeJoinRequest {
1749 room_id: announced_room_id,
1750 joiner_x25519_pubkey_b64,
1751 code,
1752 } => {
1753 if announced_room_id != room_id {
1754 return;
1755 }
1756 let joiner_fp = match verified_signer {
1757 Some(fp) => fp,
1758 None => {
1759 warn!("CodeJoinRequest unsigned; dropping");
1760 return;
1761 }
1762 };
1763 let our_fp = self.identity.fingerprint().to_string();
1767 if !self.is_owner(room_id, &our_fp) {
1768 return;
1769 }
1770 let now = now_unix();
1772 let (code_ok, our_session_id, wrap_input) = {
1773 let mut rooms = self.active_rooms.lock().unwrap();
1774 let room = match rooms.get_mut(room_id) {
1775 Some(r) => r,
1776 None => return,
1777 };
1778 if room.passphrase_key.is_none() {
1779 warn!("CodeJoinRequest: no passphrase key locally; can't respond");
1780 return;
1781 }
1782 let original_len = room.issued_codes.len();
1783 room.issued_codes.retain(|(c, exp)| !(c == &code && *exp > now));
1784 let matched = room.issued_codes.len() < original_len;
1785 if !matched {
1786 info!(%joiner_fp, "CodeJoinRequest: code invalid or expired; ignoring");
1787 return;
1788 }
1789 let crypto = room.crypto.as_ref().unwrap();
1790 (
1791 true,
1792 crypto.our_session_id(),
1793 crypto.our_session_key_b64(),
1794 )
1795 };
1796 let _ = code_ok;
1797 let their_pub = match crate::crypto::sas::parse_pubkey(&joiner_x25519_pubkey_b64) {
1799 Ok(pk) => pk,
1800 Err(e) => {
1801 warn!(%e, "CodeJoinRequest: bad pubkey");
1802 return;
1803 }
1804 };
1805 use x25519_dalek::{PublicKey, StaticSecret};
1806 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
1807 let our_pub = PublicKey::from(&our_secret);
1808 let shared = our_secret.diffie_hellman(&their_pub);
1809 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
1811 let mut wrap_key = [0u8; passphrase::KEY_LEN];
1812 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
1813 .expect("32 bytes is within HKDF limits");
1814 let wrapped = match passphrase::wrap(wrap_input.as_bytes(), &wrap_key) {
1817 Ok(w) => w,
1818 Err(e) => {
1819 warn!(%e, "CodeJoinRequest: wrap failed");
1820 return;
1821 }
1822 };
1823 let response = RoomMessage::CodeJoinResponse {
1824 room_id: room_id.to_string(),
1825 target_fingerprint: joiner_fp.clone(),
1826 owner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
1827 owner_session_id: our_session_id,
1828 wrapped_session_key_b64: wrapped,
1829 nonce_b64: String::new(), };
1831 if let Ok(env) = crate::crypto::sign_message(&self.identity, &response) {
1832 if let Ok(bytes) = crate::network::protocol::encode_wire_signed(&env) {
1833 self.network
1834 .publish_room_message(room_id.to_string(), bytes)
1835 .await;
1836 }
1837 }
1838 info!(%joiner_fp, %room_id, "issued CodeJoinResponse");
1839 }
1840 RoomMessage::CodeJoinResponse {
1841 room_id: announced_room_id,
1842 target_fingerprint,
1843 owner_x25519_pubkey_b64,
1844 owner_session_id,
1845 wrapped_session_key_b64,
1846 nonce_b64: _,
1847 } => {
1848 if announced_room_id != room_id || target_fingerprint != our_fp {
1849 return;
1850 }
1851 let owner_fp = match verified_signer {
1852 Some(fp) => fp,
1853 None => {
1854 warn!("CodeJoinResponse unsigned; dropping");
1855 return;
1856 }
1857 };
1858 let our_secret = match self
1859 .pending_code_secrets
1860 .lock()
1861 .unwrap()
1862 .remove(room_id)
1863 {
1864 Some(s) => s,
1865 None => {
1866 warn!(%room_id, "CodeJoinResponse with no pending code-join state");
1867 return;
1868 }
1869 };
1870 let owner_pub = match crate::crypto::sas::parse_pubkey(&owner_x25519_pubkey_b64) {
1871 Ok(pk) => pk,
1872 Err(e) => {
1873 warn!(%e, "CodeJoinResponse: bad owner pubkey");
1874 return;
1875 }
1876 };
1877 let shared = our_secret.diffie_hellman(&owner_pub);
1878 let hk = hkdf::Hkdf::<sha2::Sha256>::new(None, shared.as_bytes());
1879 let mut wrap_key = [0u8; passphrase::KEY_LEN];
1880 hk.expand(b"huddle-code-join-v1", &mut wrap_key)
1881 .expect("32 bytes within HKDF limits");
1882 let session_key_bytes =
1883 match passphrase::unwrap(&wrapped_session_key_b64, &wrap_key) {
1884 Ok(b) => b,
1885 Err(e) => {
1886 warn!(%e, "CodeJoinResponse: unwrap failed");
1887 return;
1888 }
1889 };
1890 let session_key_str = match String::from_utf8(session_key_bytes) {
1891 Ok(s) => s,
1892 Err(e) => {
1893 warn!(%e, "CodeJoinResponse: session key wasn't valid utf8");
1894 return;
1895 }
1896 };
1897 let mut rooms = self.active_rooms.lock().unwrap();
1899 if let Some(room) = rooms.get_mut(room_id) {
1900 if let Some(crypto) = room.crypto.as_mut() {
1901 if let Err(e) =
1902 crypto.add_inbound_session(&owner_fp, &session_key_str)
1903 {
1904 warn!(%e, "CodeJoinResponse: add_inbound_session failed");
1905 } else {
1906 info!(%room_id, %owner_fp, %owner_session_id, "code-join completed; can decrypt owner's messages");
1907 room.members.insert(owner_fp.clone());
1908 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
1909 room_id: room_id.to_string(),
1910 fingerprint: owner_fp,
1911 });
1912 }
1913 }
1914 }
1915 }
1916 RoomMessage::JoinRefused {
1917 room_id: announced_room_id,
1918 target_fingerprint,
1919 reason,
1920 } => {
1921 if announced_room_id != room_id || target_fingerprint != our_fp {
1922 return;
1923 }
1924 let _ = self.app_event_tx.send(AppEvent::Error {
1928 description: format!("join refused: {reason}"),
1929 });
1930 }
1931 RoomMessage::SasConfirm { tx_id, matched } => {
1932 let signer = match verified_signer {
1933 Some(fp) => fp,
1934 None => return,
1935 };
1936 let (room_id_done, partner_fp_done, both_done) = {
1937 let mut flows = self.sas_flows.lock().unwrap();
1938 let flow = match flows.get_mut(&tx_id) {
1939 Some(f) => f,
1940 None => return,
1941 };
1942 if flow.partner_fingerprint != signer {
1943 return;
1944 }
1945 if !matched {
1946 let _ = flow;
1948 flows.remove(&tx_id);
1949 return;
1950 }
1951 flow.their_confirmed = true;
1952 if flow.our_confirmed && flow.their_confirmed {
1953 (
1954 Some(flow.room_id.clone()),
1955 Some(flow.partner_fingerprint.clone()),
1956 true,
1957 )
1958 } else {
1959 (None, None, false)
1960 }
1961 };
1962 if both_done {
1963 if let (Some(rid), Some(pfp)) = (room_id_done, partner_fp_done) {
1964 if let Err(e) = self.finish_sas(&tx_id, &rid, &pfp).await {
1965 warn!(%e, "finish_sas failed");
1966 }
1967 }
1968 }
1969 }
1970 }
1971 }
1972
1973 pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
1981 let bytes = std::fs::read(path)?;
1982 let name = path
1983 .file_name()
1984 .map(|n| n.to_string_lossy().to_string())
1985 .unwrap_or_else(|| "untitled".into());
1986 let mime = crate::files::guess_mime(&name);
1987 let original_path = path.to_path_buf();
1988
1989 let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
1990 let mut rooms = self.active_rooms.lock().unwrap();
1991 let room = rooms
1992 .get_mut(room_id)
1993 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1994 if room.info.encrypted {
1995 let crypto = room
1996 .crypto
1997 .as_mut()
1998 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1999 let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
2000 (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
2001 } else {
2002 (false, None, None, bytes)
2003 }
2004 };
2005 let _ = &mut maybe_session_id; let plan =
2008 self.file_manager
2009 .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
2010 let file_id = plan.file_id.clone();
2011 let total = plan.chunks.len() as u32;
2012 let our_fp = self.identity.fingerprint().to_string();
2013
2014 let attachment = StoredAttachment {
2015 id: 0,
2016 room_id: room_id.to_string(),
2017 message_id: None,
2018 sender_fingerprint: our_fp.clone(),
2019 file_id: file_id.clone(),
2020 name: name.clone(),
2021 mime: mime.clone(),
2022 size_bytes: plan.size_bytes as i64,
2023 status: AttachmentStatus::Ready,
2024 cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
2025 saved_path: Some(original_path.to_string_lossy().into()),
2026 error: None,
2027 encrypted: room_encrypted,
2028 wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
2029 nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
2030 megolm_session_id: encrypted_meta_opt
2031 .as_ref()
2032 .map(|m| m.megolm_session_id.clone()),
2033 content_hash: encrypted_meta_opt.as_ref().map(|m| m.content_hash.clone()),
2034 created_at: now_unix(),
2035 };
2036 repo::upsert_attachment(&self.db, &attachment)?;
2037 let _ = self.app_event_tx.send(AppEvent::FileOffered {
2038 room_id: room_id.to_string(),
2039 file_id: file_id.clone(),
2040 name: name.clone(),
2041 size_bytes: plan.size_bytes,
2042 sender_fingerprint: our_fp.clone(),
2043 });
2044
2045 let offer = RoomMessage::FileOffer {
2047 sender_fingerprint: our_fp.clone(),
2048 file_id: file_id.clone(),
2049 name,
2050 size_bytes: plan.size_bytes,
2051 mime,
2052 chunk_count: total,
2053 encrypted_meta: encrypted_meta_opt,
2054 };
2055 if let Ok(bytes) = encode_wire(&offer) {
2056 self.network
2057 .publish_room_message(room_id.to_string(), bytes)
2058 .await;
2059 }
2060
2061 let net = self.network.clone();
2064 let room = room_id.to_string();
2065 let our = our_fp.clone();
2066 let fid = file_id.clone();
2067 let chunks = plan.chunks.clone();
2068 tokio::spawn(async move {
2069 for (i, data) in chunks.iter().enumerate() {
2070 let msg = RoomMessage::FileChunk {
2071 sender_fingerprint: our.clone(),
2072 file_id: fid.clone(),
2073 chunk_index: i as u32,
2074 total_chunks: total,
2075 data_b64: B64.encode(data),
2076 };
2077 if let Ok(bytes) = encode_wire(&msg) {
2078 net.publish_room_message(room.clone(), bytes).await;
2079 }
2080 tokio::time::sleep(Duration::from_millis(40)).await;
2081 }
2082 });
2083
2084 Ok(file_id)
2085 }
2086
2087 pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
2090 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2091 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2092 if !matches!(
2093 attachment.status,
2094 AttachmentStatus::Ready | AttachmentStatus::Saved
2095 ) {
2096 return Err(HuddleError::Other(format!(
2097 "attachment is not ready (status={})",
2098 attachment.status.as_str()
2099 )));
2100 }
2101 let plaintext = if attachment.encrypted
2106 && attachment.sender_fingerprint == self.identity.fingerprint()
2107 {
2108 match attachment
2109 .saved_path
2110 .as_deref()
2111 .filter(|p| Path::new(p).exists())
2112 {
2113 Some(src) => std::fs::read(src)?,
2114 None => {
2115 return Err(HuddleError::Other(
2116 "your original file has moved or been deleted — it can't be \
2117 recovered from the encrypted cache"
2118 .into(),
2119 ));
2120 }
2121 }
2122 } else {
2123 let cached = self.file_manager.read_cache(file_id)?;
2124 if attachment.encrypted {
2125 let meta = EncryptedFileMeta {
2126 megolm_session_id: attachment
2127 .megolm_session_id
2128 .clone()
2129 .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
2130 wrapped_key_b64: attachment
2131 .wrapped_key
2132 .clone()
2133 .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
2134 nonce_b64: attachment
2135 .nonce
2136 .clone()
2137 .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
2138 content_hash: attachment
2139 .content_hash
2140 .clone()
2141 .ok_or_else(|| HuddleError::Other("missing content_hash".into()))?,
2142 };
2143 self.decrypt_attachment(
2144 room_id,
2145 &attachment.sender_fingerprint,
2146 &cached,
2147 &meta,
2148 )?
2149 } else {
2150 cached
2151 }
2152 };
2153 let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
2154 repo::update_attachment_paths(
2155 &self.db,
2156 room_id,
2157 file_id,
2158 None,
2159 Some(&saved.to_string_lossy()),
2160 )?;
2161 repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
2162 let _ = self.app_event_tx.send(AppEvent::FileSaved {
2163 file_id: file_id.into(),
2164 path: saved.to_string_lossy().into(),
2165 });
2166 Ok(saved)
2167 }
2168
2169 pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
2171 self.file_manager.cancel_incoming(file_id);
2172 repo::update_attachment_status(
2173 &self.db,
2174 room_id,
2175 file_id,
2176 AttachmentStatus::Cancelled,
2177 None,
2178 )?;
2179 Ok(())
2180 }
2181
2182 pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
2184 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
2185 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
2186 let path = attachment
2187 .saved_path
2188 .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
2189 open_with_system(&path)
2190 }
2191
2192 pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
2193 repo::list_room_attachments(&self.db, room_id)
2194 }
2195
2196 pub fn set_member_verified(
2200 &self,
2201 room_id: &str,
2202 fingerprint: &str,
2203 verified: bool,
2204 ) -> Result<()> {
2205 let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
2210 if !members.iter().any(|m| m.fingerprint == fingerprint) {
2211 repo::upsert_room_member(
2212 &self.db,
2213 &StoredRoomMember {
2214 room_id: room_id.to_string(),
2215 peer_id: String::new(),
2216 fingerprint: fingerprint.to_string(),
2217 last_seen: Some(now_unix()),
2218 verified,
2219 ed25519_pubkey: None,
2220 role: "member".into(),
2221 },
2222 )?;
2223 }
2224 repo::set_member_verified(&self.db, room_id, fingerprint, verified)
2225 }
2226
2227 pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
2228 repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
2229 }
2230
2231 pub fn is_owner(&self, room_id: &str, fingerprint: &str) -> bool {
2234 repo::list_room_owners(&self.db, room_id)
2235 .unwrap_or_default()
2236 .iter()
2237 .any(|fp| fp == fingerprint)
2238 }
2239
2240 pub fn we_are_owner(&self, room_id: &str) -> bool {
2241 self.is_owner(room_id, &self.identity.fingerprint().to_string())
2242 }
2243
2244 pub fn room_owners(&self, room_id: &str) -> Vec<String> {
2247 repo::list_room_owners(&self.db, room_id).unwrap_or_default()
2248 }
2249
2250 pub fn verified_only_inbound(&self) -> bool {
2253 repo::get_setting(&self.db, "verified_only_inbound")
2254 .unwrap_or(None)
2255 .map(|v| v == "1")
2256 .unwrap_or(false)
2257 }
2258
2259 pub fn set_verified_only_inbound(&self, on: bool) -> Result<()> {
2260 repo::set_setting(&self.db, "verified_only_inbound", if on { "1" } else { "0" })
2261 }
2262
2263 pub fn room_verified_only(&self, room_id: &str) -> bool {
2268 repo::get_room_verified_only(&self.db, room_id).unwrap_or(false)
2269 }
2270
2271 pub fn set_room_verified_only(&self, room_id: &str, on: bool) -> Result<()> {
2272 repo::set_room_verified_only(&self.db, room_id, on)
2273 }
2274
2275 pub fn onboarding_seen(&self) -> bool {
2277 repo::is_onboarding_seen(&self.db).unwrap_or(true)
2278 }
2279
2280 pub fn mark_onboarding_seen(&self) -> Result<()> {
2281 repo::mark_onboarding_seen(&self.db)
2282 }
2283
2284 pub async fn grant_owner(&self, room_id: &str, target_fingerprint: &str) -> Result<()> {
2288 let our_fp = self.identity.fingerprint().to_string();
2289 if !self.is_owner(room_id, &our_fp) {
2290 return Err(HuddleError::Other(
2291 "only an owner can grant owner".into(),
2292 ));
2293 }
2294 let msg = RoomMessage::OwnerGrant {
2295 room_id: room_id.to_string(),
2296 target_fingerprint: target_fingerprint.to_string(),
2297 };
2298 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2299 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2300 self.network
2301 .publish_room_message(room_id.to_string(), bytes)
2302 .await;
2303 repo::set_member_role(&self.db, room_id, target_fingerprint, "owner")?;
2305 Ok(())
2306 }
2307
2308 pub async fn kick_member(
2319 &self,
2320 room_id: &str,
2321 target_fingerprint: &str,
2322 ) -> Result<String> {
2323 let our_fp = self.identity.fingerprint().to_string();
2324 if !self.is_owner(room_id, &our_fp) {
2325 return Err(HuddleError::Other("only an owner can kick".into()));
2326 }
2327 if target_fingerprint == our_fp {
2328 return Err(HuddleError::Other("can't kick yourself".into()));
2329 }
2330 let info = self
2331 .active_rooms
2332 .lock()
2333 .unwrap()
2334 .get(room_id)
2335 .map(|r| r.info.clone())
2336 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2337 if !info.encrypted {
2338 let msg = RoomMessage::BanMember {
2342 room_id: room_id.to_string(),
2343 target_fingerprint: target_fingerprint.to_string(),
2344 };
2345 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2346 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2347 self.network
2348 .publish_room_message(room_id.to_string(), bytes)
2349 .await;
2350 repo::add_room_ban(
2351 &self.db,
2352 room_id,
2353 target_fingerprint,
2354 &our_fp,
2355 &env.signature_b64,
2356 now_unix(),
2357 )?;
2358 self.evict_banned_member(room_id, target_fingerprint);
2359 return Ok(String::new());
2360 }
2361 let new_passphrase = generate_join_passphrase();
2363 let msg = RoomMessage::BanMember {
2364 room_id: room_id.to_string(),
2365 target_fingerprint: target_fingerprint.to_string(),
2366 };
2367 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2368 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2369 self.network
2370 .publish_room_message(room_id.to_string(), bytes)
2371 .await;
2372 repo::add_room_ban(
2373 &self.db,
2374 room_id,
2375 target_fingerprint,
2376 &our_fp,
2377 &env.signature_b64,
2378 now_unix(),
2379 )?;
2380 self.evict_banned_member(room_id, target_fingerprint);
2381 self.rotate_room(room_id, &new_passphrase).await?;
2384 Ok(new_passphrase)
2385 }
2386
2387 pub fn generate_join_code(&self, room_id: &str) -> Result<String> {
2394 let our_fp = self.identity.fingerprint().to_string();
2395 if !self.is_owner(room_id, &our_fp) {
2396 return Err(HuddleError::Other(
2397 "only an owner can issue join codes".into(),
2398 ));
2399 }
2400 let code = generate_alphanumeric_code(8);
2401 let expires_at = now_unix() + 10 * 60;
2402 let mut rooms = self.active_rooms.lock().unwrap();
2403 let room = rooms
2404 .get_mut(room_id)
2405 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2406 let now = now_unix();
2408 room.issued_codes.retain(|(_, exp)| *exp > now);
2409 room.issued_codes.push((code.clone(), expires_at));
2410 Ok(code)
2411 }
2412
2413 pub async fn join_room_with_code(
2420 &self,
2421 room_id: &str,
2422 code: &str,
2423 ) -> Result<()> {
2424 let info = {
2426 let d = self.discovered_rooms.lock().unwrap().get(room_id).cloned();
2427 match d {
2428 Some(d) => StoredRoom {
2429 id: room_id.to_string(),
2430 name: d.name,
2431 creator_fingerprint: d.creator_fingerprint,
2432 encrypted: d.encrypted,
2433 passphrase_salt: None, created_at: now_unix(),
2435 last_active: Some(now_unix()),
2436 },
2437 None => {
2438 return Err(HuddleError::Other(format!(
2439 "room {room_id} not visible — wait for an announcement"
2440 )))
2441 }
2442 }
2443 };
2444 if !info.encrypted {
2445 return Err(HuddleError::Other(
2446 "code-join only applies to encrypted rooms".into(),
2447 ));
2448 }
2449 let our_fp = self.identity.fingerprint().to_string();
2450 use x25519_dalek::{PublicKey, StaticSecret};
2453 let our_secret = StaticSecret::random_from_rng(rand::thread_rng());
2454 let our_pub = PublicKey::from(&our_secret);
2455 self.pending_code_secrets
2458 .lock()
2459 .unwrap()
2460 .insert(room_id.to_string(), our_secret);
2461 self.active_rooms.lock().unwrap().insert(
2464 room_id.to_string(),
2465 ActiveRoom {
2466 info: info.clone(),
2467 crypto: Some(RoomCrypto::new_for_room(
2468 self.db.clone(),
2469 room_id.to_string(),
2470 our_fp.clone(),
2471 self.session_persist_key,
2472 )?),
2473 passphrase_key: None,
2474 members: {
2475 let mut s = HashSet::new();
2476 s.insert(our_fp.clone());
2477 s
2478 },
2479 typers: HashMap::new(),
2480 read_only: true,
2481 issued_codes: Vec::new(),
2482 },
2483 );
2484 self.network.subscribe_room(room_id.to_string()).await;
2485 let req = RoomMessage::CodeJoinRequest {
2487 room_id: room_id.to_string(),
2488 joiner_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2489 code: code.to_string(),
2490 };
2491 let env = crate::crypto::sign_message(&self.identity, &req)?;
2492 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2493 self.network
2494 .publish_room_message(room_id.to_string(), bytes)
2495 .await;
2496 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
2499 room_id: room_id.to_string(),
2500 });
2501 Ok(())
2502 }
2503
2504 pub async fn sas_start(&self, room_id: &str, target_fingerprint: &str) -> Result<String> {
2510 let (tx_id_bytes, our_secret, our_pub) = crate::crypto::sas::new_session();
2511 let tx_id = B64.encode(tx_id_bytes);
2512 let msg = RoomMessage::SasInit {
2513 tx_id: tx_id.clone(),
2514 ephemeral_x25519_pubkey_b64: B64.encode(our_pub.as_bytes()),
2515 target_fingerprint: target_fingerprint.to_string(),
2516 };
2517 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2518 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2519 self.sas_flows.lock().unwrap().insert(
2520 tx_id.clone(),
2521 SasFlow {
2522 room_id: room_id.to_string(),
2523 partner_fingerprint: target_fingerprint.to_string(),
2524 our_secret,
2525 sas_code: None,
2526 our_confirmed: false,
2527 their_confirmed: false,
2528 },
2529 );
2530 self.network
2531 .publish_room_message(room_id.to_string(), bytes)
2532 .await;
2533 Ok(tx_id)
2534 }
2535
2536 pub async fn sas_match(&self, tx_id: &str) -> Result<()> {
2540 let (room_id, partner_fp, both_done) = {
2541 let mut flows = self.sas_flows.lock().unwrap();
2542 let flow = flows
2543 .get_mut(tx_id)
2544 .ok_or_else(|| HuddleError::Other("unknown SAS tx_id".into()))?;
2545 flow.our_confirmed = true;
2546 (
2547 flow.room_id.clone(),
2548 flow.partner_fingerprint.clone(),
2549 flow.our_confirmed && flow.their_confirmed,
2550 )
2551 };
2552 let msg = RoomMessage::SasConfirm {
2553 tx_id: tx_id.to_string(),
2554 matched: true,
2555 };
2556 let env = crate::crypto::sign_message(&self.identity, &msg)?;
2557 let bytes = crate::network::protocol::encode_wire_signed(&env)?;
2558 self.network
2559 .publish_room_message(room_id.clone(), bytes)
2560 .await;
2561 if both_done {
2562 self.finish_sas(tx_id, &room_id, &partner_fp).await?;
2563 }
2564 Ok(())
2565 }
2566
2567 pub fn sas_cancel(&self, tx_id: &str) {
2571 self.sas_flows.lock().unwrap().remove(tx_id);
2572 }
2573
2574 async fn finish_sas(
2577 &self,
2578 tx_id: &str,
2579 room_id: &str,
2580 partner_fingerprint: &str,
2581 ) -> Result<()> {
2582 repo::set_member_verified(&self.db, room_id, partner_fingerprint, true)?;
2583 repo::add_verified_peer(&self.db, partner_fingerprint, now_unix())?;
2584 self.sas_flows.lock().unwrap().remove(tx_id);
2585 let _ = self.app_event_tx.send(AppEvent::SasVerified {
2586 room_id: room_id.to_string(),
2587 partner_fingerprint: partner_fingerprint.to_string(),
2588 });
2589 Ok(())
2590 }
2591
2592 fn evict_banned_member(&self, room_id: &str, fingerprint: &str) {
2597 if let Some(room) = self.active_rooms.lock().unwrap().get_mut(room_id) {
2598 room.members.remove(fingerprint);
2599 }
2600 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
2601 room_id: room_id.to_string(),
2602 fingerprint: fingerprint.to_string(),
2603 });
2604 }
2605
2606 pub fn display_name(&self) -> Option<String> {
2607 repo::get_display_name(&self.db).unwrap_or(None)
2608 }
2609
2610 pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
2611 repo::set_display_name(&self.db, name)
2612 }
2613
2614 pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
2616 repo::lookup_display_name(&self.db, fingerprint).unwrap_or(None)
2617 }
2618
2619 pub fn is_room_muted(&self, room_id: &str) -> bool {
2620 repo::is_room_muted(&self.db, room_id).unwrap_or(false)
2621 }
2622
2623 pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
2624 repo::set_room_muted(&self.db, room_id, muted)
2625 }
2626
2627 pub async fn broadcast_typing(&self, room_id: &str) {
2630 if !self.active_rooms.lock().unwrap().contains_key(room_id) {
2631 return;
2632 }
2633 let msg = RoomMessage::Typing {
2634 sender_fingerprint: self.identity.fingerprint().to_string(),
2635 };
2636 if let Ok(bytes) = encode_wire(&msg) {
2637 self.network
2638 .publish_room_message(room_id.to_string(), bytes)
2639 .await;
2640 }
2641 }
2642
2643 pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
2646 let now = now_unix();
2647 let mut rooms = self.active_rooms.lock().unwrap();
2648 let room = match rooms.get_mut(room_id) {
2649 Some(r) => r,
2650 None => return Vec::new(),
2651 };
2652 room.typers.retain(|_, exp| *exp > now);
2653 let mut v: Vec<String> = room.typers.keys().cloned().collect();
2654 v.sort();
2655 v
2656 }
2657
2658 pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
2668 if new_passphrase.is_empty() {
2669 return Err(HuddleError::Other("new passphrase is empty".into()));
2670 }
2671 let new_salt = passphrase::random_salt();
2672 let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
2673
2674 let info = {
2675 let mut rooms = self.active_rooms.lock().unwrap();
2676 let room = rooms
2677 .get_mut(room_id)
2678 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2679 if !room.info.encrypted {
2680 return Err(HuddleError::Other(
2681 "rotation only applies to encrypted rooms".into(),
2682 ));
2683 }
2684 let new_crypto = RoomCrypto::new_for_room(
2686 self.db.clone(),
2687 room_id.to_string(),
2688 self.identity.fingerprint().to_string(),
2689 self.session_persist_key,
2690 )?;
2691 room.crypto = Some(new_crypto);
2692 room.passphrase_key = Some(new_key);
2693 room.info.passphrase_salt = Some(new_salt.to_vec());
2694 room.info.clone()
2695 };
2696
2697 let rot = RoomMessage::RotateRoomKey {
2703 rotator_fingerprint: self.identity.fingerprint().to_string(),
2704 new_salt: new_salt.to_vec(),
2705 };
2706 if let Ok(bytes) = encode_wire(&rot) {
2707 self.network
2708 .publish_room_message(room_id.to_string(), bytes)
2709 .await;
2710 }
2711 if let Err(e) = self.broadcast_member_announce(room_id).await {
2713 warn!(%e, "rotate: broadcast announce failed");
2714 }
2715
2716 repo::insert_room(&self.db, &info)?;
2718 Ok(())
2719 }
2720
2721 pub async fn accept_rotation(
2725 &self,
2726 room_id: &str,
2727 new_salt: &[u8],
2728 new_passphrase: &str,
2729 ) -> Result<()> {
2730 let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
2731 let info = {
2732 let mut rooms = self.active_rooms.lock().unwrap();
2733 let room = rooms
2734 .get_mut(room_id)
2735 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
2736 room.passphrase_key = Some(new_key);
2737 room.info.passphrase_salt = Some(new_salt.to_vec());
2738 room.info.clone()
2739 };
2740 let req = RoomMessage::SessionKeyRequest {
2744 requester_fingerprint: self.identity.fingerprint().to_string(),
2745 };
2746 if let Ok(bytes) = encode_wire(&req) {
2747 self.network
2748 .publish_room_message(room_id.to_string(), bytes)
2749 .await;
2750 }
2751 repo::insert_room(&self.db, &info)?;
2752 Ok(())
2753 }
2754
2755 #[allow(clippy::too_many_arguments)]
2760 fn handle_file_offer(
2761 &self,
2762 room_id: &str,
2763 sender_fingerprint: String,
2764 file_id: String,
2765 name: String,
2766 size_bytes: u64,
2767 mime: Option<String>,
2768 _chunk_count: u32,
2769 encrypted_meta: Option<EncryptedFileMeta>,
2770 ) {
2771 let encrypted = encrypted_meta.is_some();
2772 let attachment = StoredAttachment {
2773 id: 0,
2774 room_id: room_id.to_string(),
2775 message_id: None,
2776 sender_fingerprint: sender_fingerprint.clone(),
2777 file_id: file_id.clone(),
2778 name: name.clone(),
2779 mime,
2780 size_bytes: size_bytes as i64,
2781 status: AttachmentStatus::Offered,
2782 cache_path: None,
2783 saved_path: None,
2784 error: None,
2785 encrypted,
2786 wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
2787 nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
2788 megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
2789 content_hash: encrypted_meta.as_ref().map(|m| m.content_hash.clone()),
2790 created_at: now_unix(),
2791 };
2792 if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
2793 warn!(%e, "upsert attachment");
2794 return;
2795 }
2796 self.file_manager.set_expected_size(&file_id, size_bytes);
2799 let _ = self.app_event_tx.send(AppEvent::FileOffered {
2800 room_id: room_id.to_string(),
2801 file_id,
2802 name,
2803 size_bytes,
2804 sender_fingerprint,
2805 });
2806 }
2807
2808 fn handle_file_chunk(
2809 &self,
2810 room_id: &str,
2811 _sender_fingerprint: String,
2812 file_id: String,
2813 chunk_index: u32,
2814 total_chunks: u32,
2815 data_b64: String,
2816 ) {
2817 let data = match B64.decode(&data_b64) {
2818 Ok(d) => d,
2819 Err(e) => {
2820 warn!(%e, "bad chunk base64");
2821 return;
2822 }
2823 };
2824 let expected_size = match repo::get_attachment(&self.db, room_id, &file_id) {
2828 Ok(Some(a)) => {
2829 if matches!(
2830 a.status,
2831 AttachmentStatus::Cancelled | AttachmentStatus::Failed
2832 ) {
2833 return;
2834 }
2835 a.size_bytes as u64
2836 }
2837 Ok(None) => crate::files::MAX_FILE_SIZE,
2838 Err(e) => {
2839 warn!(%e, "get attachment for chunk");
2840 crate::files::MAX_FILE_SIZE
2841 }
2842 };
2843
2844 let result = self.file_manager.accept_chunk(
2845 &file_id,
2846 chunk_index,
2847 total_chunks,
2848 data,
2849 expected_size,
2850 );
2851 match result {
2852 Ok(None) => {
2853 let _ = repo::update_attachment_status(
2855 &self.db,
2856 room_id,
2857 &file_id,
2858 AttachmentStatus::Downloading,
2859 None,
2860 );
2861 let bytes_so_far = self
2864 .file_manager
2865 .progress(&file_id)
2866 .map(|(b, _)| b)
2867 .unwrap_or(0);
2868 let _ = self.app_event_tx.send(AppEvent::FileProgress {
2869 file_id: file_id.clone(),
2870 bytes_received: bytes_so_far,
2871 total_bytes: expected_size,
2872 });
2873 }
2874 Ok(Some(completed)) => {
2875 let _ = repo::update_attachment_paths(
2876 &self.db,
2877 room_id,
2878 &file_id,
2879 Some(&completed.cache_path.to_string_lossy()),
2880 None,
2881 );
2882 let _ = repo::update_attachment_status(
2883 &self.db,
2884 room_id,
2885 &file_id,
2886 AttachmentStatus::Ready,
2887 None,
2888 );
2889 let _ = self.app_event_tx.send(AppEvent::FileReady {
2890 file_id: file_id.clone(),
2891 });
2892 }
2893 Err(e) => {
2894 let msg = e.to_string();
2895 warn!(%msg, "chunk processing failed");
2896 let _ = repo::update_attachment_status(
2897 &self.db,
2898 room_id,
2899 &file_id,
2900 AttachmentStatus::Failed,
2901 Some(&msg),
2902 );
2903 let _ = self.app_event_tx.send(AppEvent::FileFailed {
2904 file_id: file_id.clone(),
2905 reason: msg,
2906 });
2907 }
2908 }
2909 }
2910
2911 fn maybe_emit_mention(&self, room_id: &str, body: &str) {
2914 let full = self.identity.fingerprint().to_lowercase();
2915 let short: &str = full.split('-').next().unwrap_or(&full);
2917 let lower = body.to_lowercase();
2918 let hit = lower.contains(full.as_str())
2922 || lower
2923 .split(|c: char| !c.is_ascii_hexdigit())
2924 .any(|tok| tok == short);
2925 if hit {
2926 let _ = self.app_event_tx.send(AppEvent::MentionReceived {
2927 room_id: room_id.to_string(),
2928 body: body.to_string(),
2929 });
2930 }
2931 }
2932
2933 fn decrypt_attachment(
2934 &self,
2935 room_id: &str,
2936 sender_fingerprint: &str,
2937 ciphertext: &[u8],
2938 meta: &EncryptedFileMeta,
2939 ) -> Result<Vec<u8>> {
2940 let mut rooms = self.active_rooms.lock().unwrap();
2941 let room = rooms
2942 .get_mut(room_id)
2943 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
2944 let crypto = room
2945 .crypto
2946 .as_mut()
2947 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
2948 file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
2949 }
2950}
2951
2952fn open_with_system(path: &str) -> Result<()> {
2954 #[cfg(target_os = "macos")]
2955 let cmd = "open";
2956 #[cfg(target_os = "linux")]
2957 let cmd = "xdg-open";
2958 #[cfg(target_os = "windows")]
2959 let cmd = "cmd";
2960 #[cfg(target_os = "windows")]
2961 let args = vec!["/C", "start", "", path];
2962 #[cfg(not(target_os = "windows"))]
2963 let args = vec![path];
2964
2965 std::process::Command::new(cmd)
2966 .args(args)
2967 .spawn()
2968 .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
2969 Ok(())
2970}
2971
2972static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
2975 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
2976
2977#[allow(dead_code)]
2978fn salt_len() -> usize {
2979 SALT_LEN
2980}
2981
2982fn now_unix() -> i64 {
2983 SystemTime::now()
2984 .duration_since(UNIX_EPOCH)
2985 .unwrap()
2986 .as_secs() as i64
2987}
2988
2989fn generate_join_passphrase() -> String {
2995 use rand::RngCore;
2996 let mut bytes = [0u8; 16];
2997 rand::thread_rng().fill_bytes(&mut bytes);
2998 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes)
3001}
3002
3003fn generate_alphanumeric_code(len: usize) -> String {
3008 use rand::Rng;
3009 const ALPHABET: &[u8] = b"ABCDEFGHJKMNPQRSTUVWXYZ23456789";
3010 let mut rng = rand::thread_rng();
3011 let mut out = String::with_capacity(len + 1);
3012 for i in 0..len {
3013 if i == 4 && len == 8 {
3014 out.push('-'); }
3016 let idx = rng.gen_range(0..ALPHABET.len());
3017 out.push(ALPHABET[idx] as char);
3018 }
3019 out
3020}
3021
3022#[cfg(test)]
3023mod parser_tests {
3024 use super::parse_dial_address;
3025
3026 #[test]
3027 fn parses_ipv4_port() {
3028 let m = parse_dial_address("10.3.72.53:9027").unwrap();
3029 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
3030 }
3031
3032 #[test]
3033 fn parses_bracketed_ipv6() {
3034 let m = parse_dial_address("[::1]:9027").unwrap();
3035 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
3036 }
3037
3038 #[test]
3039 fn rejects_unbracketed_ipv6() {
3040 let err = parse_dial_address("fe80::1:9027").unwrap_err();
3041 assert!(err.to_string().contains("brackets"));
3042 }
3043
3044 #[test]
3045 fn passes_through_raw_multiaddr() {
3046 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
3047 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
3048 }
3049
3050 #[test]
3051 fn empty_address_is_error() {
3052 assert!(parse_dial_address(" ").is_err());
3053 }
3054
3055 #[test]
3056 fn rejects_bad_port() {
3057 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
3058 }
3059}