1pub mod events;
2
3use std::collections::{HashMap, HashSet};
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, SystemTime, UNIX_EPOCH};
7
8use base64::engine::general_purpose::STANDARD as B64;
9use base64::Engine;
10use libp2p::{Multiaddr, PeerId};
11use tokio::sync::broadcast;
12use tracing::{debug, error, info, warn};
13
14use crate::config;
15use crate::crypto::passphrase::{self, KEY_LEN, SALT_LEN};
16use crate::crypto::RoomCrypto;
17use crate::error::{HuddleError, Result};
18use crate::files::encryption::{self as file_encryption, EncryptedFileMeta};
19use crate::files::FileManager;
20use crate::identity::Identity;
21use crate::network::events::NetworkEvent;
22use crate::network::protocol::{RoomAnnouncement, RoomMessage};
23use crate::network::{self, NetworkHandle, NetworkMode};
24use crate::storage::repo::{
25 self, derive_room_id, AttachmentStatus, KnownPeer, StoredAttachment, StoredRoom,
26 StoredRoomMember,
27};
28use crate::storage::{self, Db};
29
30pub use self::events::{AppEvent, DiscoveredRoom};
31
32#[derive(Debug, Clone)]
35pub struct KnownPeerStatus {
36 pub address: String,
37 pub label: Option<String>,
38 pub last_connected_at: Option<i64>,
39 pub connected_peer_id: Option<PeerId>,
40}
41
42pub fn parse_dial_address(input: &str) -> Result<Multiaddr> {
45 let trimmed = input.trim();
46 if trimmed.is_empty() {
47 return Err(HuddleError::Other("address is empty".into()));
48 }
49 if trimmed.starts_with('/') {
50 return trimmed
51 .parse::<Multiaddr>()
52 .map_err(|e| HuddleError::Other(format!("invalid multiaddr: {e}")));
53 }
54 if let Some(rest) = trimmed.strip_prefix('[') {
55 let (host, port) = rest
56 .split_once("]:")
57 .ok_or_else(|| HuddleError::Other(format!("expected [ipv6]:port, got {trimmed}")))?;
58 let port: u16 = port
59 .parse()
60 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
61 return format!("/ip6/{}/tcp/{}", host, port)
62 .parse::<Multiaddr>()
63 .map_err(|e| HuddleError::Other(format!("invalid ipv6 address: {e}")));
64 }
65 let (host, port) = trimmed
66 .rsplit_once(':')
67 .ok_or_else(|| HuddleError::Other(format!("expected ip:port, got {trimmed}")))?;
68 if host.contains(':') {
69 return Err(HuddleError::Other(format!(
70 "ambiguous IPv6 address — wrap host in brackets: [{host}]:{port}"
71 )));
72 }
73 let port: u16 = port
74 .parse()
75 .map_err(|_| HuddleError::Other(format!("invalid port: {port}")))?;
76 format!("/ip4/{}/tcp/{}", host, port)
77 .parse::<Multiaddr>()
78 .map_err(|e| HuddleError::Other(format!("invalid address: {e}")))
79}
80
81struct ActiveRoom {
83 info: StoredRoom,
84 crypto: Option<RoomCrypto>,
85 passphrase_key: Option<[u8; KEY_LEN]>,
88 members: HashSet<String>,
90 typers: HashMap<String, i64>,
93}
94
95const TYPING_TTL_SECS: i64 = 3;
96
97const DISCOVERED_TTL_SECS: i64 = 45;
100const ANNOUNCE_INTERVAL_SECS: u64 = 15;
101
102#[derive(Clone)]
103pub struct AppHandle {
104 identity: Arc<Identity>,
105 network: NetworkHandle,
106 mode: NetworkMode,
107 active_rooms: Arc<Mutex<HashMap<String, ActiveRoom>>>,
108 discovered_rooms: Arc<Mutex<HashMap<String, DiscoveredRoom>>>,
109 restorable_rooms: Arc<Mutex<HashMap<String, StoredRoom>>>,
113 connected_dial_addrs: Arc<Mutex<HashMap<String, PeerId>>>,
116 file_manager: Arc<FileManager>,
118 db: Db,
119 app_event_tx: broadcast::Sender<AppEvent>,
120}
121
122impl AppHandle {
123 pub async fn start() -> Result<Self> {
124 Self::start_with_options(NetworkMode::Mdns, 0, None).await
125 }
126
127 pub async fn start_with_options(
128 mode: NetworkMode,
129 port: u16,
130 master_key: Option<&[u8; 32]>,
131 ) -> Result<Self> {
132 config::ensure_data_dir()?;
133 if let Some(mk) = master_key {
134 let subkey = storage::keychain::derive_subkey(mk, b"megolm-persist");
135 crate::crypto::megolm::install_session_persist_key(subkey);
136 }
137 let db = storage::open_db(&config::db_path(), master_key)?;
138 Self::start_with_db_and_options(db, mode, port).await
139 }
140
141 pub async fn start_with_db(db: Db) -> Result<Self> {
142 Self::start_with_db_and_options(db, NetworkMode::Mdns, 0).await
143 }
144
145 pub async fn start_with_db_and_options(
146 db: Db,
147 mode: NetworkMode,
148 port: u16,
149 ) -> Result<Self> {
150 let identity = Self::load_or_create_identity(&db)?;
151 let identity = Arc::new(identity);
152 info!(fingerprint = %identity.fingerprint(), peer_id = %identity.peer_id(), mode = %mode.as_str(), port, "identity loaded");
153
154 let (net_event_tx, net_event_rx) = tokio::sync::mpsc::channel::<NetworkEvent>(256);
155 let (app_event_tx, _) = broadcast::channel::<AppEvent>(256);
156 let network = network::start_network_with(&identity, net_event_tx, mode, port)?;
157
158 let active_rooms = Arc::new(Mutex::new(HashMap::new()));
159 let discovered_rooms = Arc::new(Mutex::new(HashMap::new()));
160 let restorable_rooms = Arc::new(Mutex::new(HashMap::new()));
161 let connected_dial_addrs = Arc::new(Mutex::new(HashMap::new()));
162 let file_manager = Arc::new(FileManager::new(&config::data_dir())?);
163
164 let handle = Self {
165 identity,
166 network,
167 mode,
168 active_rooms,
169 discovered_rooms,
170 restorable_rooms,
171 connected_dial_addrs,
172 file_manager,
173 db,
174 app_event_tx,
175 };
176
177 handle.spawn_event_processor(net_event_rx);
178 handle.spawn_announcement_ticker();
179 handle.spawn_discovered_room_pruner();
180 handle.spawn_known_peer_reconnector();
181 handle.restore_rooms_from_db().await;
182
183 Ok(handle)
184 }
185
186 pub fn mode(&self) -> NetworkMode {
187 self.mode
188 }
189
190 pub fn subscribe(&self) -> broadcast::Receiver<AppEvent> {
191 self.app_event_tx.subscribe()
192 }
193
194 pub fn fingerprint(&self) -> &str {
195 self.identity.fingerprint()
196 }
197
198 pub fn peer_id(&self) -> PeerId {
199 self.identity.peer_id()
200 }
201
202 pub fn discovered_rooms(&self) -> Vec<DiscoveredRoom> {
203 let now = now_unix();
204 let mut by_id: HashMap<String, DiscoveredRoom> = self
205 .discovered_rooms
206 .lock()
207 .unwrap()
208 .clone();
209
210 for room in self.active_rooms.lock().unwrap().values() {
214 let entry = DiscoveredRoom {
215 room_id: room.info.id.clone(),
216 name: room.info.name.clone(),
217 encrypted: room.info.encrypted,
218 member_count: room.members.len() as u32,
219 creator_fingerprint: room.info.creator_fingerprint.clone(),
220 last_seen: now,
221 restorable: false,
222 };
223 by_id
224 .entry(room.info.id.clone())
225 .and_modify(|d| {
226 d.last_seen = now;
227 if entry.member_count > d.member_count {
228 d.member_count = entry.member_count;
229 }
230 d.restorable = false;
231 })
232 .or_insert(entry);
233 }
234
235 for (id, stored) in self.restorable_rooms.lock().unwrap().iter() {
239 if by_id.contains_key(id) {
240 continue;
241 }
242 by_id.insert(
243 id.clone(),
244 DiscoveredRoom {
245 room_id: id.clone(),
246 name: stored.name.clone(),
247 encrypted: stored.encrypted,
248 member_count: 0,
249 creator_fingerprint: stored.creator_fingerprint.clone(),
250 last_seen: stored.last_active.unwrap_or(stored.created_at),
251 restorable: true,
252 },
253 );
254 }
255
256 let mut v: Vec<DiscoveredRoom> = by_id.into_values().collect();
257 v.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
258 v
259 }
260
261 pub fn active_room_ids(&self) -> Vec<String> {
262 self.active_rooms.lock().unwrap().keys().cloned().collect()
263 }
264
265 pub fn active_room_info(&self, room_id: &str) -> Option<StoredRoom> {
266 self.active_rooms
267 .lock()
268 .unwrap()
269 .get(room_id)
270 .map(|r| r.info.clone())
271 }
272
273 pub fn room_members(&self, room_id: &str) -> Vec<String> {
274 self.active_rooms
275 .lock()
276 .unwrap()
277 .get(room_id)
278 .map(|r| {
279 let mut m: Vec<String> = r.members.iter().cloned().collect();
280 m.sort();
281 m
282 })
283 .unwrap_or_default()
284 }
285
286 pub fn room_messages(&self, room_id: &str, limit: i64) -> Result<Vec<repo::StoredRoomMessage>> {
287 repo::get_room_messages(&self.db, room_id, limit)
288 }
289
290 pub fn search_room_messages(
291 &self,
292 room_id: &str,
293 query: &str,
294 limit: i64,
295 ) -> Result<Vec<repo::StoredRoomMessage>> {
296 repo::search_room_messages(&self.db, room_id, query, limit)
297 }
298
299 pub async fn start_room(
301 &self,
302 name: &str,
303 encrypted: bool,
304 passphrase: Option<&str>,
305 ) -> Result<String> {
306 if encrypted && passphrase.is_none() {
307 return Err(HuddleError::Other(
308 "encrypted room requires a passphrase".into(),
309 ));
310 }
311
312 let created_at = now_unix();
313 let creator_fp = self.identity.fingerprint().to_string();
314 let room_id = derive_room_id(&creator_fp, name, created_at);
315
316 let (passphrase_salt, passphrase_key) = if encrypted {
317 let salt = passphrase::random_salt();
318 let key = passphrase::derive_key(passphrase.unwrap(), &salt)?;
319 (Some(salt.to_vec()), Some(key))
320 } else {
321 (None, None)
322 };
323
324 let info = StoredRoom {
325 id: room_id.clone(),
326 name: name.to_string(),
327 creator_fingerprint: creator_fp.clone(),
328 encrypted,
329 passphrase_salt: passphrase_salt.clone(),
330 created_at,
331 last_active: Some(created_at),
332 };
333 repo::insert_room(&self.db, &info)?;
334
335 let crypto = if encrypted {
336 Some(RoomCrypto::new_for_room(
337 self.db.clone(),
338 room_id.clone(),
339 creator_fp.clone(),
340 )?)
341 } else {
342 None
343 };
344
345 let mut members = HashSet::new();
346 members.insert(creator_fp.clone());
347
348 self.active_rooms.lock().unwrap().insert(
349 room_id.clone(),
350 ActiveRoom {
351 info: info.clone(),
352 crypto,
353 passphrase_key,
354 members,
355 typers: HashMap::new(),
356 },
357 );
358
359 self.network.subscribe_room(room_id.clone()).await;
360 self.announce_room_now(&info, 1).await;
361
362 let app = self.clone();
365 let rid = room_id.clone();
366 tokio::spawn(async move {
367 tokio::time::sleep(Duration::from_millis(500)).await;
368 if let Err(e) = app.broadcast_member_announce(&rid).await {
369 warn!(%e, "broadcast member announce");
370 }
371 });
372
373 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
374 room_id: room_id.clone(),
375 });
376
377 Ok(room_id)
378 }
379
380 pub async fn join_room(&self, room_id: &str, passphrase: Option<&str>) -> Result<()> {
384 let (name, creator_fingerprint, encrypted, salt_opt) = {
386 if let Some(d) = self.discovered_rooms.lock().unwrap().get(room_id).cloned() {
387 let salt = self.get_room_salt(room_id);
388 (d.name, d.creator_fingerprint, d.encrypted, salt)
389 } else if let Some(stored) = self.restorable_rooms.lock().unwrap().get(room_id).cloned()
390 {
391 (
392 stored.name,
393 stored.creator_fingerprint,
394 stored.encrypted,
395 stored.passphrase_salt,
396 )
397 } else if let Some(stored) = repo::get_room(&self.db, room_id)? {
398 (
399 stored.name,
400 stored.creator_fingerprint,
401 stored.encrypted,
402 stored.passphrase_salt,
403 )
404 } else {
405 return Err(HuddleError::Other(format!("room {room_id} not found")));
406 }
407 };
408
409 if encrypted && passphrase.is_none() {
410 return Err(HuddleError::Other(
411 "encrypted room requires a passphrase".into(),
412 ));
413 }
414
415 let passphrase_key = if encrypted {
416 let salt = salt_opt
417 .clone()
418 .ok_or_else(|| HuddleError::Other("missing salt for encrypted room".into()))?;
419 Some(passphrase::derive_key(passphrase.unwrap(), &salt)?)
420 } else {
421 None
422 };
423
424 let info = StoredRoom {
425 id: room_id.to_string(),
426 name,
427 creator_fingerprint,
428 encrypted,
429 passphrase_salt: salt_opt.clone(),
430 created_at: now_unix(),
431 last_active: Some(now_unix()),
432 };
433 repo::insert_room(&self.db, &info)?;
434
435 let crypto = if encrypted {
436 Some(RoomCrypto::new_for_room(
437 self.db.clone(),
438 room_id.to_string(),
439 self.identity.fingerprint().to_string(),
440 )?)
441 } else {
442 None
443 };
444
445 let mut members = HashSet::new();
446 members.insert(self.identity.fingerprint().to_string());
447
448 self.active_rooms.lock().unwrap().insert(
449 room_id.to_string(),
450 ActiveRoom {
451 info: info.clone(),
452 crypto,
453 passphrase_key,
454 members,
455 typers: HashMap::new(),
456 },
457 );
458 self.restorable_rooms.lock().unwrap().remove(room_id);
460
461 self.network.subscribe_room(room_id.to_string()).await;
462
463 let app = self.clone();
464 let rid = room_id.to_string();
465 tokio::spawn(async move {
466 tokio::time::sleep(Duration::from_millis(500)).await;
467 if let Err(e) = app.broadcast_member_announce(&rid).await {
468 warn!(%e, "broadcast member announce");
469 }
470 let req = RoomMessage::SessionKeyRequest {
472 requester_fingerprint: app.identity.fingerprint().to_string(),
473 };
474 if let Ok(bytes) = serde_json::to_vec(&req) {
475 app.network.publish_room_message(rid.clone(), bytes).await;
476 }
477 });
478
479 let _ = self.app_event_tx.send(AppEvent::RoomJoined {
480 room_id: room_id.to_string(),
481 });
482
483 Ok(())
484 }
485
486 async fn restore_rooms_from_db(&self) {
491 let rooms = match repo::list_rooms(&self.db) {
492 Ok(v) => v,
493 Err(e) => {
494 warn!(%e, "list rooms on restore");
495 return;
496 }
497 };
498 let our_fp = self.identity.fingerprint().to_string();
499 let count = rooms.len();
500 for info in rooms {
501 if info.encrypted {
502 self.restorable_rooms
503 .lock()
504 .unwrap()
505 .insert(info.id.clone(), info);
506 continue;
507 }
508 let mut members = HashSet::new();
509 members.insert(our_fp.clone());
510 if let Ok(stored_members) = repo::list_room_members(&self.db, &info.id) {
511 for m in stored_members {
512 members.insert(m.fingerprint);
513 }
514 }
515 self.active_rooms.lock().unwrap().insert(
516 info.id.clone(),
517 ActiveRoom {
518 info: info.clone(),
519 crypto: None,
520 passphrase_key: None,
521 members,
522 typers: HashMap::new(),
523 },
524 );
525 self.network.subscribe_room(info.id.clone()).await;
526 self.announce_room_now(&info, 1).await;
527 info!(room_id = %info.id, name = %info.name, "restored room");
528 }
529 if count > 0 {
530 debug!(count, "restored rooms from db");
531 }
532 }
533
534 pub async fn leave_room(&self, room_id: &str) -> Result<()> {
535 let leave_msg = RoomMessage::MemberLeave {
537 sender_fingerprint: self.identity.fingerprint().to_string(),
538 };
539 if let Ok(bytes) = serde_json::to_vec(&leave_msg) {
540 self.network
541 .publish_room_message(room_id.to_string(), bytes)
542 .await;
543 }
544
545 self.active_rooms.lock().unwrap().remove(room_id);
546 self.network.unsubscribe_room(room_id.to_string()).await;
547
548 let _ = self.app_event_tx.send(AppEvent::RoomLeft {
549 room_id: room_id.to_string(),
550 });
551 Ok(())
552 }
553
554 pub async fn send_room_message(&self, room_id: &str, body: &str) -> Result<()> {
555 let our_fp = self.identity.fingerprint().to_string();
556 let msg = {
557 let mut rooms = self.active_rooms.lock().unwrap();
558 let room = rooms
559 .get_mut(room_id)
560 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
561
562 if room.info.encrypted {
563 let crypto = room
564 .crypto
565 .as_mut()
566 .ok_or_else(|| HuddleError::Session("encrypted room missing crypto".into()))?;
567 let (session_id, ct_bytes) = crypto.encrypt(body.as_bytes())?;
568 RoomMessage::Encrypted {
569 sender_fingerprint: our_fp.clone(),
570 session_id,
571 ciphertext_b64: base64::Engine::encode(
572 &base64::engine::general_purpose::STANDARD,
573 &ct_bytes,
574 ),
575 }
576 } else {
577 RoomMessage::Plain {
578 sender_fingerprint: our_fp.clone(),
579 body: body.to_string(),
580 }
581 }
582 };
583
584 let bytes = serde_json::to_vec(&msg)?;
585 self.network
586 .publish_room_message(room_id.to_string(), bytes)
587 .await;
588
589 let now = now_unix();
590 let msg_id =
591 repo::insert_room_message(&self.db, room_id, &our_fp, "out", body, now)?;
592 repo::update_room_last_active(&self.db, room_id, now)?;
593
594 let _ = self.app_event_tx.send(AppEvent::MessageSent {
595 room_id: room_id.to_string(),
596 body: body.to_string(),
597 message_id: msg_id,
598 });
599
600 Ok(())
601 }
602
603 pub async fn shutdown(&self) {
604 self.network.shutdown().await;
605 }
606
607 pub async fn dial(&self, input: &str) -> Result<()> {
616 let multiaddr = parse_dial_address(input)?;
617 let canonical = multiaddr.to_string();
618 info!(%canonical, "dialing");
619
620 repo::upsert_known_peer(
621 &self.db,
622 &KnownPeer {
623 address: canonical.clone(),
624 label: None,
625 last_connected_at: None,
626 last_attempt_at: Some(now_unix()),
627 created_at: now_unix(),
628 },
629 )?;
630
631 let _ = self.app_event_tx.send(AppEvent::Dialing {
632 address: canonical.clone(),
633 });
634 self.network.dial(multiaddr).await;
635 Ok(())
636 }
637
638 pub fn known_peers(&self) -> Vec<KnownPeerStatus> {
639 let connected = self.connected_dial_addrs.lock().unwrap().clone();
640 let stored = repo::list_known_peers(&self.db).unwrap_or_default();
641 stored
642 .into_iter()
643 .map(|p| {
644 let connected_peer = connected.get(&p.address).copied();
645 KnownPeerStatus {
646 address: p.address,
647 label: p.label,
648 last_connected_at: p.last_connected_at,
649 connected_peer_id: connected_peer,
650 }
651 })
652 .collect()
653 }
654
655 pub async fn forget_peer(&self, address: &str) -> Result<()> {
656 repo::forget_known_peer(&self.db, address)?;
657 self.connected_dial_addrs.lock().unwrap().remove(address);
658 Ok(())
659 }
660
661 pub async fn redial(&self, address: &str) -> Result<()> {
663 self.dial(address).await
664 }
665
666 fn spawn_known_peer_reconnector(&self) {
667 let handle = self.clone();
668 tokio::spawn(async move {
669 tokio::time::sleep(Duration::from_millis(500)).await;
671 let known = repo::list_known_peers(&handle.db).unwrap_or_default();
672 for peer in known {
673 if let Err(e) = handle.dial(&peer.address).await {
674 debug!(%e, addr = %peer.address, "auto-reconnect failed");
675 }
676 }
677 });
678 }
679
680 fn load_or_create_identity(db: &Db) -> Result<Identity> {
685 if let Some(stored) = repo::load_identity(db)? {
686 let mut bytes = [0u8; 32];
687 bytes.copy_from_slice(&stored.ed25519_secret);
688 Identity::from_secret_bytes(bytes)
689 } else {
690 let id = Identity::generate()?;
691 repo::save_identity(db, &id.secret_bytes(), now_unix())?;
692 Ok(id)
693 }
694 }
695
696 fn get_room_salt(&self, room_id: &str) -> Option<Vec<u8>> {
697 self.active_rooms
698 .lock()
699 .unwrap()
700 .get(room_id)
701 .and_then(|r| r.info.passphrase_salt.clone())
702 .or_else(|| {
703 ROOM_SALT_CACHE
705 .lock()
706 .unwrap()
707 .get(room_id)
708 .cloned()
709 })
710 }
711
712 async fn announce_room_now(&self, info: &StoredRoom, member_count: u32) {
713 let ann = RoomAnnouncement {
714 room_id: info.id.clone(),
715 name: info.name.clone(),
716 encrypted: info.encrypted,
717 passphrase_salt: info.passphrase_salt.clone(),
718 member_count,
719 creator_fingerprint: info.creator_fingerprint.clone(),
720 announced_at: now_unix(),
721 };
722 self.network.announce_room(ann).await;
723 }
724
725 async fn broadcast_member_announce(&self, room_id: &str) -> Result<()> {
726 let our_fp = self.identity.fingerprint().to_string();
727 let wrapped = {
728 let mut rooms = self.active_rooms.lock().unwrap();
729 let room = rooms
730 .get_mut(room_id)
731 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
732 if room.info.encrypted {
733 let crypto = room.crypto.as_mut().unwrap();
734 let session_key = crypto.our_session_key_b64();
735 let passphrase_key = room
736 .passphrase_key
737 .as_ref()
738 .ok_or_else(|| HuddleError::Session("missing passphrase key".into()))?;
739 Some(passphrase::wrap(session_key.as_bytes(), passphrase_key)?)
740 } else {
741 None
742 }
743 };
744 let display_name = repo::get_display_name(&self.db).unwrap_or(None);
745 let msg = RoomMessage::MemberAnnounce {
746 sender_fingerprint: our_fp,
747 wrapped_session_key: wrapped,
748 display_name,
749 };
750 let bytes = serde_json::to_vec(&msg)?;
751 self.network
752 .publish_room_message(room_id.to_string(), bytes)
753 .await;
754 Ok(())
755 }
756
757 fn spawn_event_processor(&self, mut net_rx: tokio::sync::mpsc::Receiver<NetworkEvent>) {
758 let handle = self.clone();
759 tokio::spawn(async move {
760 while let Some(event) = net_rx.recv().await {
761 handle.process_network_event(event).await;
762 }
763 info!("event processor stopped");
764 });
765 }
766
767 fn spawn_announcement_ticker(&self) {
768 let handle = self.clone();
769 tokio::spawn(async move {
770 let mut interval =
771 tokio::time::interval(Duration::from_secs(ANNOUNCE_INTERVAL_SECS));
772 interval.tick().await; loop {
774 interval.tick().await;
775 let snapshot: Vec<(StoredRoom, u32)> = {
776 let active = handle.active_rooms.lock().unwrap();
777 active
778 .values()
779 .map(|r| (r.info.clone(), r.members.len() as u32))
780 .collect()
781 };
782 for (info, member_count) in snapshot {
783 handle.announce_room_now(&info, member_count).await;
784 }
785 }
786 });
787 }
788
789 fn spawn_discovered_room_pruner(&self) {
790 let handle = self.clone();
791 tokio::spawn(async move {
792 let mut interval = tokio::time::interval(Duration::from_secs(10));
793 interval.tick().await;
794 loop {
795 interval.tick().await;
796 let now = now_unix();
797 let mut to_drop = Vec::new();
798 {
799 let mut map = handle.discovered_rooms.lock().unwrap();
800 map.retain(|id, r| {
801 if now - r.last_seen > DISCOVERED_TTL_SECS {
802 to_drop.push(id.clone());
803 false
804 } else {
805 true
806 }
807 });
808 }
809 for id in to_drop {
810 let _ = handle.app_event_tx.send(AppEvent::RoomLost { room_id: id });
811 }
812 }
813 });
814 }
815
816 async fn process_network_event(&self, event: NetworkEvent) {
817 match event {
818 NetworkEvent::PeerDiscovered { peer_id } => {
819 let _ = self.app_event_tx.send(AppEvent::PeerDiscovered { peer_id });
820 }
821 NetworkEvent::PeerExpired { .. } => {}
822 NetworkEvent::ListeningOn { address } => {
823 let _ = self.app_event_tx.send(AppEvent::ListeningOn {
824 address: address.to_string(),
825 });
826 }
827 NetworkEvent::RoomAnnouncementReceived(ann) => {
828 let our_fp = self.identity.fingerprint();
829 if let Some(salt) = &ann.passphrase_salt {
831 ROOM_SALT_CACHE
832 .lock()
833 .unwrap()
834 .insert(ann.room_id.clone(), salt.clone());
835 }
836 let discovered = DiscoveredRoom {
837 room_id: ann.room_id.clone(),
838 name: ann.name.clone(),
839 encrypted: ann.encrypted,
840 member_count: ann.member_count,
841 creator_fingerprint: ann.creator_fingerprint.clone(),
842 last_seen: now_unix(),
843 restorable: false,
844 };
845 if ann.creator_fingerprint == our_fp
847 && self.active_rooms.lock().unwrap().contains_key(&ann.room_id)
848 {
849 self.discovered_rooms
851 .lock()
852 .unwrap()
853 .insert(ann.room_id.clone(), discovered);
854 return;
855 }
856 self.discovered_rooms
857 .lock()
858 .unwrap()
859 .insert(ann.room_id.clone(), discovered.clone());
860 let _ = self.app_event_tx.send(AppEvent::RoomDiscovered(discovered));
861 }
862 NetworkEvent::RoomMessageReceived {
863 room_id,
864 payload,
865 from_peer: _,
866 } => {
867 let msg: RoomMessage = match serde_json::from_slice(&payload) {
868 Ok(m) => m,
869 Err(e) => {
870 warn!(%e, "bad room message");
871 return;
872 }
873 };
874 self.handle_room_message(&room_id, msg).await;
875 }
876 NetworkEvent::DialSucceeded { peer_id, address } => {
877 let addr_s = address.to_string();
878 self.connected_dial_addrs
879 .lock()
880 .unwrap()
881 .insert(addr_s.clone(), peer_id);
882 let _ = repo::upsert_known_peer(
883 &self.db,
884 &KnownPeer {
885 address: addr_s.clone(),
886 label: None,
887 last_connected_at: Some(now_unix()),
888 last_attempt_at: Some(now_unix()),
889 created_at: now_unix(),
890 },
891 );
892 let _ = self.app_event_tx.send(AppEvent::DialSucceeded {
893 address: addr_s,
894 peer_id,
895 });
896 }
897 NetworkEvent::DialFailed { address, error } => {
898 let addr_s = address.to_string();
899 let _ = self.app_event_tx.send(AppEvent::DialFailed {
900 address: addr_s,
901 error,
902 });
903 }
904 }
905 }
906
907 async fn handle_room_message(&self, room_id: &str, msg: RoomMessage) {
908 let our_fp = self.identity.fingerprint().to_string();
909 match msg {
910 RoomMessage::MemberAnnounce {
911 sender_fingerprint,
912 wrapped_session_key,
913 display_name,
914 } => {
915 if sender_fingerprint == our_fp {
916 return;
917 }
918 let need_inbound = {
919 let mut rooms = self.active_rooms.lock().unwrap();
920 let room = match rooms.get_mut(room_id) {
921 Some(r) => r,
922 None => return,
923 };
924 let newly_added = room.members.insert(sender_fingerprint.clone());
925 if newly_added {
926 let _ = self.app_event_tx.send(AppEvent::MemberJoined {
927 room_id: room_id.to_string(),
928 fingerprint: sender_fingerprint.clone(),
929 });
930 }
931 let _ = repo::upsert_room_member(
933 &self.db,
934 &StoredRoomMember {
935 room_id: room_id.to_string(),
936 peer_id: String::new(), fingerprint: sender_fingerprint.clone(),
938 last_seen: Some(now_unix()),
939 verified: false,
940 },
941 );
942 if let Some(name) = display_name.as_deref() {
943 let _ = repo::set_member_display_name(
944 &self.db,
945 room_id,
946 &sender_fingerprint,
947 Some(name),
948 );
949 }
950 room.info.encrypted && wrapped_session_key.is_some()
951 };
952
953 if need_inbound {
954 let wrapped = wrapped_session_key.unwrap();
955 let result = {
956 let mut rooms = self.active_rooms.lock().unwrap();
957 let room = rooms.get_mut(room_id).unwrap();
958 let passphrase_key = match &room.passphrase_key {
959 Some(k) => k,
960 None => {
961 warn!("no passphrase key when receiving session key");
962 return;
963 }
964 };
965 match passphrase::unwrap(&wrapped, passphrase_key) {
966 Ok(plain) => match String::from_utf8(plain) {
967 Ok(key_b64) => {
968 let crypto = room.crypto.as_mut().unwrap();
969 crypto.add_inbound_session(&sender_fingerprint, &key_b64)
970 }
971 Err(e) => Err(HuddleError::Session(format!("utf8: {e}"))),
972 },
973 Err(e) => Err(e),
974 }
975 };
976 if let Err(e) = result {
977 error!(%e, "add inbound session failed");
978 }
979 }
980 }
981 RoomMessage::SessionKeyRequest {
982 requester_fingerprint,
983 } => {
984 if requester_fingerprint == our_fp {
985 return;
986 }
987 if let Err(e) = self.broadcast_member_announce(room_id).await {
989 warn!(%e, "broadcast member announce on request");
990 }
991 }
992 RoomMessage::Encrypted {
993 sender_fingerprint,
994 session_id,
995 ciphertext_b64,
996 } => {
997 if sender_fingerprint == our_fp {
998 return;
999 }
1000 let ct_bytes = match base64::Engine::decode(
1001 &base64::engine::general_purpose::STANDARD,
1002 &ciphertext_b64,
1003 ) {
1004 Ok(b) => b,
1005 Err(e) => {
1006 warn!(%e, "bad base64 ciphertext");
1007 return;
1008 }
1009 };
1010 let plaintext = {
1011 let mut rooms = self.active_rooms.lock().unwrap();
1012 let room = match rooms.get_mut(room_id) {
1013 Some(r) => r,
1014 None => return,
1015 };
1016 let crypto = match room.crypto.as_mut() {
1017 Some(c) => c,
1018 None => return,
1019 };
1020 crypto.decrypt(&sender_fingerprint, &session_id, &ct_bytes)
1021 };
1022 match plaintext {
1023 Ok(pt) => {
1024 let body = String::from_utf8_lossy(&pt).to_string();
1025 let sent_at = now_unix();
1026 let _ = repo::insert_room_message(
1027 &self.db,
1028 room_id,
1029 &sender_fingerprint,
1030 "in",
1031 &body,
1032 sent_at,
1033 );
1034 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1035 self.maybe_emit_mention(room_id, &body);
1036 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1037 room_id: room_id.to_string(),
1038 sender_fingerprint,
1039 body,
1040 sent_at,
1041 });
1042 }
1043 Err(e) => {
1044 debug!(%e, "decrypt failed (probably missing session key)");
1045 }
1046 }
1047 }
1048 RoomMessage::Plain {
1049 sender_fingerprint,
1050 body,
1051 } => {
1052 if sender_fingerprint == our_fp {
1053 return;
1054 }
1055 let sent_at = now_unix();
1056 let _ = repo::insert_room_message(
1057 &self.db,
1058 room_id,
1059 &sender_fingerprint,
1060 "in",
1061 &body,
1062 sent_at,
1063 );
1064 let _ = repo::update_room_last_active(&self.db, room_id, sent_at);
1065 self.maybe_emit_mention(room_id, &body);
1066 let _ = self.app_event_tx.send(AppEvent::MessageReceived {
1067 room_id: room_id.to_string(),
1068 sender_fingerprint,
1069 body,
1070 sent_at,
1071 });
1072 }
1073 RoomMessage::Typing { sender_fingerprint } => {
1074 if sender_fingerprint == our_fp {
1075 return;
1076 }
1077 let expiry = now_unix() + TYPING_TTL_SECS;
1078 let mut rooms = self.active_rooms.lock().unwrap();
1079 if let Some(room) = rooms.get_mut(room_id) {
1080 room.typers.insert(sender_fingerprint, expiry);
1081 }
1082 drop(rooms);
1083 let _ = self.app_event_tx.send(AppEvent::TypingChanged {
1084 room_id: room_id.to_string(),
1085 });
1086 }
1087 RoomMessage::RotateRoomKey {
1088 rotator_fingerprint,
1089 new_salt,
1090 } => {
1091 if rotator_fingerprint == our_fp {
1092 return;
1093 }
1094 let _ = self.app_event_tx.send(AppEvent::RotationRequested {
1095 room_id: room_id.to_string(),
1096 rotator_fingerprint,
1097 new_salt,
1098 });
1099 }
1100 RoomMessage::MemberLeave { sender_fingerprint } => {
1101 if sender_fingerprint == our_fp {
1102 return;
1103 }
1104 let removed = {
1105 let mut rooms = self.active_rooms.lock().unwrap();
1106 if let Some(room) = rooms.get_mut(room_id) {
1107 room.members.remove(&sender_fingerprint)
1108 } else {
1109 false
1110 }
1111 };
1112 if removed {
1113 let _ = self.app_event_tx.send(AppEvent::MemberLeft {
1114 room_id: room_id.to_string(),
1115 fingerprint: sender_fingerprint,
1116 });
1117 }
1118 }
1119 RoomMessage::FileOffer {
1120 sender_fingerprint,
1121 file_id,
1122 name,
1123 size_bytes,
1124 mime,
1125 chunk_count,
1126 encrypted_meta,
1127 } => {
1128 if sender_fingerprint == our_fp {
1129 return; }
1131 self.handle_file_offer(
1132 room_id,
1133 sender_fingerprint,
1134 file_id,
1135 name,
1136 size_bytes,
1137 mime,
1138 chunk_count,
1139 encrypted_meta,
1140 );
1141 }
1142 RoomMessage::FileChunk {
1143 sender_fingerprint,
1144 file_id,
1145 chunk_index,
1146 total_chunks,
1147 data_b64,
1148 } => {
1149 if sender_fingerprint == our_fp {
1150 return;
1151 }
1152 self.handle_file_chunk(
1153 room_id,
1154 sender_fingerprint,
1155 file_id,
1156 chunk_index,
1157 total_chunks,
1158 data_b64,
1159 );
1160 }
1161 }
1162 }
1163
1164 pub async fn send_file(&self, room_id: &str, path: &Path) -> Result<String> {
1172 let bytes = std::fs::read(path)?;
1173 let name = path
1174 .file_name()
1175 .map(|n| n.to_string_lossy().to_string())
1176 .unwrap_or_else(|| "untitled".into());
1177 let mime = crate::files::guess_mime(&name);
1178 let original_path = path.to_path_buf();
1179
1180 let (room_encrypted, mut maybe_session_id, encrypted_meta_opt, wire_bytes) = {
1181 let mut rooms = self.active_rooms.lock().unwrap();
1182 let room = rooms
1183 .get_mut(room_id)
1184 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1185 if room.info.encrypted {
1186 let crypto = room
1187 .crypto
1188 .as_mut()
1189 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1190 let (ciphertext, meta) = file_encryption::encrypt_file(&bytes, crypto)?;
1191 (true, Some(meta.megolm_session_id.clone()), Some(meta), ciphertext)
1192 } else {
1193 (false, None, None, bytes)
1194 }
1195 };
1196 let _ = &mut maybe_session_id; let plan =
1199 self.file_manager
1200 .prepare_outgoing_from_bytes(&name, mime.clone(), wire_bytes)?;
1201 let file_id = plan.file_id.clone();
1202 let total = plan.chunks.len() as u32;
1203 let our_fp = self.identity.fingerprint().to_string();
1204
1205 let attachment = StoredAttachment {
1206 id: 0,
1207 room_id: room_id.to_string(),
1208 message_id: None,
1209 sender_fingerprint: our_fp.clone(),
1210 file_id: file_id.clone(),
1211 name: name.clone(),
1212 mime: mime.clone(),
1213 size_bytes: plan.size_bytes as i64,
1214 status: AttachmentStatus::Ready,
1215 cache_path: Some(self.file_manager.cache_path(&file_id).to_string_lossy().into()),
1216 saved_path: Some(original_path.to_string_lossy().into()),
1217 error: None,
1218 encrypted: room_encrypted,
1219 wrapped_key: encrypted_meta_opt.as_ref().map(|m| m.wrapped_key_b64.clone()),
1220 nonce: encrypted_meta_opt.as_ref().map(|m| m.nonce_b64.clone()),
1221 megolm_session_id: encrypted_meta_opt
1222 .as_ref()
1223 .map(|m| m.megolm_session_id.clone()),
1224 created_at: now_unix(),
1225 };
1226 repo::upsert_attachment(&self.db, &attachment)?;
1227 let _ = self.app_event_tx.send(AppEvent::FileOffered {
1228 room_id: room_id.to_string(),
1229 file_id: file_id.clone(),
1230 name: name.clone(),
1231 size_bytes: plan.size_bytes,
1232 sender_fingerprint: our_fp.clone(),
1233 });
1234
1235 let offer = RoomMessage::FileOffer {
1237 sender_fingerprint: our_fp.clone(),
1238 file_id: file_id.clone(),
1239 name,
1240 size_bytes: plan.size_bytes,
1241 mime,
1242 chunk_count: total,
1243 encrypted_meta: encrypted_meta_opt,
1244 };
1245 if let Ok(bytes) = serde_json::to_vec(&offer) {
1246 self.network
1247 .publish_room_message(room_id.to_string(), bytes)
1248 .await;
1249 }
1250
1251 let net = self.network.clone();
1254 let room = room_id.to_string();
1255 let our = our_fp.clone();
1256 let fid = file_id.clone();
1257 let chunks = plan.chunks.clone();
1258 tokio::spawn(async move {
1259 for (i, data) in chunks.iter().enumerate() {
1260 let msg = RoomMessage::FileChunk {
1261 sender_fingerprint: our.clone(),
1262 file_id: fid.clone(),
1263 chunk_index: i as u32,
1264 total_chunks: total,
1265 data_b64: B64.encode(data),
1266 };
1267 if let Ok(bytes) = serde_json::to_vec(&msg) {
1268 net.publish_room_message(room.clone(), bytes).await;
1269 }
1270 tokio::time::sleep(Duration::from_millis(40)).await;
1271 }
1272 });
1273
1274 Ok(file_id)
1275 }
1276
1277 pub async fn save_to_downloads(&self, room_id: &str, file_id: &str) -> Result<PathBuf> {
1280 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
1281 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
1282 if !matches!(
1283 attachment.status,
1284 AttachmentStatus::Ready | AttachmentStatus::Saved
1285 ) {
1286 return Err(HuddleError::Other(format!(
1287 "attachment is not ready (status={})",
1288 attachment.status.as_str()
1289 )));
1290 }
1291 let cached = self.file_manager.read_cache(file_id)?;
1292 let plaintext = if attachment.encrypted {
1293 let meta = EncryptedFileMeta {
1294 megolm_session_id: attachment
1295 .megolm_session_id
1296 .clone()
1297 .ok_or_else(|| HuddleError::Other("missing megolm_session_id".into()))?,
1298 wrapped_key_b64: attachment
1299 .wrapped_key
1300 .clone()
1301 .ok_or_else(|| HuddleError::Other("missing wrapped_key".into()))?,
1302 nonce_b64: attachment
1303 .nonce
1304 .clone()
1305 .ok_or_else(|| HuddleError::Other("missing nonce".into()))?,
1306 };
1307 if attachment.sender_fingerprint == self.identity.fingerprint() {
1313 return Err(HuddleError::Other(
1314 "this attachment is your own — use [o] open to open the source file".into(),
1315 ));
1316 }
1317 self.decrypt_attachment(room_id, &attachment.sender_fingerprint, &cached, &meta)?
1318 } else {
1319 cached
1320 };
1321 let saved = self.file_manager.write_to_downloads(&attachment.name, &plaintext)?;
1322 repo::update_attachment_paths(
1323 &self.db,
1324 room_id,
1325 file_id,
1326 None,
1327 Some(&saved.to_string_lossy()),
1328 )?;
1329 repo::update_attachment_status(&self.db, room_id, file_id, AttachmentStatus::Saved, None)?;
1330 let _ = self.app_event_tx.send(AppEvent::FileSaved {
1331 file_id: file_id.into(),
1332 path: saved.to_string_lossy().into(),
1333 });
1334 Ok(saved)
1335 }
1336
1337 pub async fn cancel_transfer(&self, room_id: &str, file_id: &str) -> Result<()> {
1339 self.file_manager.cancel_incoming(file_id);
1340 repo::update_attachment_status(
1341 &self.db,
1342 room_id,
1343 file_id,
1344 AttachmentStatus::Cancelled,
1345 None,
1346 )?;
1347 Ok(())
1348 }
1349
1350 pub fn open_saved(&self, room_id: &str, file_id: &str) -> Result<()> {
1352 let attachment = repo::get_attachment(&self.db, room_id, file_id)?
1353 .ok_or_else(|| HuddleError::Other("attachment not found".into()))?;
1354 let path = attachment
1355 .saved_path
1356 .ok_or_else(|| HuddleError::Other("not saved yet — press Enter to save first".into()))?;
1357 open_with_system(&path)
1358 }
1359
1360 pub fn list_room_attachments(&self, room_id: &str) -> Result<Vec<StoredAttachment>> {
1361 repo::list_room_attachments(&self.db, room_id)
1362 }
1363
1364 pub fn set_member_verified(
1368 &self,
1369 room_id: &str,
1370 fingerprint: &str,
1371 verified: bool,
1372 ) -> Result<()> {
1373 let members = repo::list_room_members(&self.db, room_id).unwrap_or_default();
1378 if !members.iter().any(|m| m.fingerprint == fingerprint) {
1379 repo::upsert_room_member(
1380 &self.db,
1381 &StoredRoomMember {
1382 room_id: room_id.to_string(),
1383 peer_id: String::new(),
1384 fingerprint: fingerprint.to_string(),
1385 last_seen: Some(now_unix()),
1386 verified,
1387 },
1388 )?;
1389 }
1390 repo::set_member_verified(&self.db, room_id, fingerprint, verified)
1391 }
1392
1393 pub fn verified_fingerprints(&self, room_id: &str) -> Vec<String> {
1394 repo::list_verified_fingerprints(&self.db, room_id).unwrap_or_default()
1395 }
1396
1397 pub fn display_name(&self) -> Option<String> {
1398 repo::get_display_name(&self.db).unwrap_or(None)
1399 }
1400
1401 pub fn set_display_name(&self, name: Option<&str>) -> Result<()> {
1402 repo::set_display_name(&self.db, name)
1403 }
1404
1405 pub fn lookup_member_display_name(&self, fingerprint: &str) -> Option<String> {
1407 repo::lookup_display_name(&self.db, fingerprint).unwrap_or(None)
1408 }
1409
1410 pub fn is_room_muted(&self, room_id: &str) -> bool {
1411 repo::is_room_muted(&self.db, room_id).unwrap_or(false)
1412 }
1413
1414 pub fn set_room_muted(&self, room_id: &str, muted: bool) -> Result<()> {
1415 repo::set_room_muted(&self.db, room_id, muted)
1416 }
1417
1418 pub async fn broadcast_typing(&self, room_id: &str) {
1421 if !self.active_rooms.lock().unwrap().contains_key(room_id) {
1422 return;
1423 }
1424 let msg = RoomMessage::Typing {
1425 sender_fingerprint: self.identity.fingerprint().to_string(),
1426 };
1427 if let Ok(bytes) = serde_json::to_vec(&msg) {
1428 self.network
1429 .publish_room_message(room_id.to_string(), bytes)
1430 .await;
1431 }
1432 }
1433
1434 pub fn typers_in_room(&self, room_id: &str) -> Vec<String> {
1437 let now = now_unix();
1438 let mut rooms = self.active_rooms.lock().unwrap();
1439 let room = match rooms.get_mut(room_id) {
1440 Some(r) => r,
1441 None => return Vec::new(),
1442 };
1443 room.typers.retain(|_, exp| *exp > now);
1444 let mut v: Vec<String> = room.typers.keys().cloned().collect();
1445 v.sort();
1446 v
1447 }
1448
1449 pub async fn rotate_room(&self, room_id: &str, new_passphrase: &str) -> Result<()> {
1459 if new_passphrase.is_empty() {
1460 return Err(HuddleError::Other("new passphrase is empty".into()));
1461 }
1462 let new_salt = passphrase::random_salt();
1463 let new_key = passphrase::derive_key(new_passphrase, &new_salt)?;
1464
1465 let info = {
1466 let mut rooms = self.active_rooms.lock().unwrap();
1467 let room = rooms
1468 .get_mut(room_id)
1469 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1470 if !room.info.encrypted {
1471 return Err(HuddleError::Other(
1472 "rotation only applies to encrypted rooms".into(),
1473 ));
1474 }
1475 let new_crypto = RoomCrypto::new_for_room(
1477 self.db.clone(),
1478 room_id.to_string(),
1479 self.identity.fingerprint().to_string(),
1480 )?;
1481 room.crypto = Some(new_crypto);
1482 room.passphrase_key = Some(new_key);
1483 room.info.passphrase_salt = Some(new_salt.to_vec());
1484 room.info.clone()
1485 };
1486
1487 repo::insert_room(&self.db, &info)?;
1489
1490 let rot = RoomMessage::RotateRoomKey {
1492 rotator_fingerprint: self.identity.fingerprint().to_string(),
1493 new_salt: new_salt.to_vec(),
1494 };
1495 if let Ok(bytes) = serde_json::to_vec(&rot) {
1496 self.network
1497 .publish_room_message(room_id.to_string(), bytes)
1498 .await;
1499 }
1500
1501 if let Err(e) = self.broadcast_member_announce(room_id).await {
1503 warn!(%e, "rotate: broadcast announce failed");
1504 }
1505 Ok(())
1506 }
1507
1508 pub async fn accept_rotation(
1512 &self,
1513 room_id: &str,
1514 new_salt: &[u8],
1515 new_passphrase: &str,
1516 ) -> Result<()> {
1517 let new_key = passphrase::derive_key(new_passphrase, new_salt)?;
1518 let info = {
1519 let mut rooms = self.active_rooms.lock().unwrap();
1520 let room = rooms
1521 .get_mut(room_id)
1522 .ok_or_else(|| HuddleError::Other(format!("not in room {room_id}")))?;
1523 room.passphrase_key = Some(new_key);
1524 room.info.passphrase_salt = Some(new_salt.to_vec());
1525 room.info.clone()
1526 };
1527 repo::insert_room(&self.db, &info)?;
1528 let req = RoomMessage::SessionKeyRequest {
1530 requester_fingerprint: self.identity.fingerprint().to_string(),
1531 };
1532 if let Ok(bytes) = serde_json::to_vec(&req) {
1533 self.network
1534 .publish_room_message(room_id.to_string(), bytes)
1535 .await;
1536 }
1537 Ok(())
1538 }
1539
1540 #[allow(clippy::too_many_arguments)]
1545 fn handle_file_offer(
1546 &self,
1547 room_id: &str,
1548 sender_fingerprint: String,
1549 file_id: String,
1550 name: String,
1551 size_bytes: u64,
1552 mime: Option<String>,
1553 _chunk_count: u32,
1554 encrypted_meta: Option<EncryptedFileMeta>,
1555 ) {
1556 let encrypted = encrypted_meta.is_some();
1557 let attachment = StoredAttachment {
1558 id: 0,
1559 room_id: room_id.to_string(),
1560 message_id: None,
1561 sender_fingerprint: sender_fingerprint.clone(),
1562 file_id: file_id.clone(),
1563 name: name.clone(),
1564 mime,
1565 size_bytes: size_bytes as i64,
1566 status: AttachmentStatus::Offered,
1567 cache_path: None,
1568 saved_path: None,
1569 error: None,
1570 encrypted,
1571 wrapped_key: encrypted_meta.as_ref().map(|m| m.wrapped_key_b64.clone()),
1572 nonce: encrypted_meta.as_ref().map(|m| m.nonce_b64.clone()),
1573 megolm_session_id: encrypted_meta.as_ref().map(|m| m.megolm_session_id.clone()),
1574 created_at: now_unix(),
1575 };
1576 if let Err(e) = repo::upsert_attachment(&self.db, &attachment) {
1577 warn!(%e, "upsert attachment");
1578 return;
1579 }
1580 let _ = self.app_event_tx.send(AppEvent::FileOffered {
1581 room_id: room_id.to_string(),
1582 file_id,
1583 name,
1584 size_bytes,
1585 sender_fingerprint,
1586 });
1587 }
1588
1589 fn handle_file_chunk(
1590 &self,
1591 room_id: &str,
1592 _sender_fingerprint: String,
1593 file_id: String,
1594 chunk_index: u32,
1595 total_chunks: u32,
1596 data_b64: String,
1597 ) {
1598 let data = match B64.decode(&data_b64) {
1599 Ok(d) => d,
1600 Err(e) => {
1601 warn!(%e, "bad chunk base64");
1602 return;
1603 }
1604 };
1605 let expected_size = repo::get_attachment(&self.db, room_id, &file_id)
1607 .ok()
1608 .flatten()
1609 .map(|a| a.size_bytes as u64)
1610 .unwrap_or(crate::files::MAX_FILE_SIZE);
1611
1612 let result = self.file_manager.accept_chunk(
1613 &file_id,
1614 chunk_index,
1615 total_chunks,
1616 data,
1617 expected_size,
1618 );
1619 match result {
1620 Ok(None) => {
1621 let _ = repo::update_attachment_status(
1623 &self.db,
1624 room_id,
1625 &file_id,
1626 AttachmentStatus::Downloading,
1627 None,
1628 );
1629 let bytes_so_far = self
1632 .file_manager
1633 .progress(&file_id)
1634 .map(|(b, _)| b)
1635 .unwrap_or(0);
1636 let _ = self.app_event_tx.send(AppEvent::FileProgress {
1637 file_id: file_id.clone(),
1638 bytes_received: bytes_so_far,
1639 total_bytes: expected_size,
1640 });
1641 }
1642 Ok(Some(completed)) => {
1643 let _ = repo::update_attachment_paths(
1644 &self.db,
1645 room_id,
1646 &file_id,
1647 Some(&completed.cache_path.to_string_lossy()),
1648 None,
1649 );
1650 let _ = repo::update_attachment_status(
1651 &self.db,
1652 room_id,
1653 &file_id,
1654 AttachmentStatus::Ready,
1655 None,
1656 );
1657 let _ = self.app_event_tx.send(AppEvent::FileReady {
1658 file_id: file_id.clone(),
1659 });
1660 }
1661 Err(e) => {
1662 let msg = e.to_string();
1663 warn!(%msg, "chunk processing failed");
1664 let _ = repo::update_attachment_status(
1665 &self.db,
1666 room_id,
1667 &file_id,
1668 AttachmentStatus::Failed,
1669 Some(&msg),
1670 );
1671 let _ = self.app_event_tx.send(AppEvent::FileFailed {
1672 file_id: file_id.clone(),
1673 reason: msg,
1674 });
1675 }
1676 }
1677 }
1678
1679 fn maybe_emit_mention(&self, room_id: &str, body: &str) {
1682 let full = self.identity.fingerprint();
1683 let short = full.split('-').next().unwrap_or(full);
1684 let lower = body.to_lowercase();
1685 if lower.contains(&full.to_lowercase()) || lower.contains(&short.to_lowercase()) {
1686 let _ = self.app_event_tx.send(AppEvent::MentionReceived {
1687 room_id: room_id.to_string(),
1688 body: body.to_string(),
1689 });
1690 }
1691 }
1692
1693 fn decrypt_attachment(
1694 &self,
1695 room_id: &str,
1696 sender_fingerprint: &str,
1697 ciphertext: &[u8],
1698 meta: &EncryptedFileMeta,
1699 ) -> Result<Vec<u8>> {
1700 let mut rooms = self.active_rooms.lock().unwrap();
1701 let room = rooms
1702 .get_mut(room_id)
1703 .ok_or_else(|| HuddleError::Other("not in room".into()))?;
1704 let crypto = room
1705 .crypto
1706 .as_mut()
1707 .ok_or_else(|| HuddleError::Session("missing room crypto".into()))?;
1708 file_encryption::decrypt_file(ciphertext, meta, crypto, sender_fingerprint)
1709 }
1710}
1711
1712fn open_with_system(path: &str) -> Result<()> {
1714 #[cfg(target_os = "macos")]
1715 let cmd = "open";
1716 #[cfg(target_os = "linux")]
1717 let cmd = "xdg-open";
1718 #[cfg(target_os = "windows")]
1719 let cmd = "cmd";
1720 #[cfg(target_os = "windows")]
1721 let args = vec!["/C", "start", "", path];
1722 #[cfg(not(target_os = "windows"))]
1723 let args = vec![path];
1724
1725 std::process::Command::new(cmd)
1726 .args(args)
1727 .spawn()
1728 .map_err(|e| HuddleError::Other(format!("spawn opener: {e}")))?;
1729 Ok(())
1730}
1731
1732static ROOM_SALT_CACHE: std::sync::LazyLock<Mutex<HashMap<String, Vec<u8>>>> =
1735 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
1736
1737#[allow(dead_code)]
1738fn salt_len() -> usize {
1739 SALT_LEN
1740}
1741
1742fn now_unix() -> i64 {
1743 SystemTime::now()
1744 .duration_since(UNIX_EPOCH)
1745 .unwrap()
1746 .as_secs() as i64
1747}
1748
1749#[cfg(test)]
1750mod parser_tests {
1751 use super::parse_dial_address;
1752
1753 #[test]
1754 fn parses_ipv4_port() {
1755 let m = parse_dial_address("10.3.72.53:9027").unwrap();
1756 assert_eq!(m.to_string(), "/ip4/10.3.72.53/tcp/9027");
1757 }
1758
1759 #[test]
1760 fn parses_bracketed_ipv6() {
1761 let m = parse_dial_address("[::1]:9027").unwrap();
1762 assert_eq!(m.to_string(), "/ip6/::1/tcp/9027");
1763 }
1764
1765 #[test]
1766 fn rejects_unbracketed_ipv6() {
1767 let err = parse_dial_address("fe80::1:9027").unwrap_err();
1768 assert!(err.to_string().contains("brackets"));
1769 }
1770
1771 #[test]
1772 fn passes_through_raw_multiaddr() {
1773 let m = parse_dial_address("/ip4/1.2.3.4/tcp/9000").unwrap();
1774 assert_eq!(m.to_string(), "/ip4/1.2.3.4/tcp/9000");
1775 }
1776
1777 #[test]
1778 fn empty_address_is_error() {
1779 assert!(parse_dial_address(" ").is_err());
1780 }
1781
1782 #[test]
1783 fn rejects_bad_port() {
1784 assert!(parse_dial_address("1.2.3.4:notaport").is_err());
1785 }
1786}