1use super::transport::{DeliveryReceipt, DeliveryStatus, ReceivedMessage};
3use super::types::*;
4use super::{DhtClient, KeyExchange, MessageStore, MessageTransport};
5use crate::control::ControlMessageHandler;
6use crate::identity::FourWordAddress;
7use crate::identity::restart::RestartManager;
8use crate::messaging::user_handle::UserHandle;
9use anyhow::Result;
10use chrono::{Duration, Utc};
11use std::collections::HashMap;
12use std::sync::Arc;
13use tokio::sync::{RwLock, broadcast};
14use tracing::{info, warn};
15
16pub async fn channel_recipients(_channel_id: &ChannelId) -> Result<Vec<FourWordAddress>> {
19 Ok(Vec::new())
41}
42
43pub struct MessagingService {
45 identity: FourWordAddress,
47 store: MessageStore,
49 transport: Arc<MessageTransport>,
51 key_exchange: Arc<KeyExchange>,
53 _dht_client: DhtClient,
55 event_tx: broadcast::Sender<ReceivedMessage>,
57 online_users: Arc<RwLock<HashMap<FourWordAddress, chrono::DateTime<Utc>>>>,
59}
60
61#[derive(Debug, Clone, Default)]
63pub struct SendOptions {
64 pub ephemeral: bool,
65 pub expiry_seconds: Option<u64>,
66 pub reply_to: Option<MessageId>,
67 pub thread_id: Option<ThreadId>,
68 pub attachments: Vec<Attachment>,
69}
70
71impl MessagingService {
72 pub async fn new(identity: FourWordAddress, dht_client: DhtClient) -> Result<Self> {
93 Self::new_with_config(identity, dht_client, super::NetworkConfig::default()).await
95 }
96
97 pub async fn new_with_config(
144 identity: FourWordAddress,
145 dht_client: DhtClient,
146 config: super::NetworkConfig,
147 ) -> Result<Self> {
148 match &config.nat_traversal {
150 Some(super::NatTraversalMode::P2PNode { concurrency_limit }) => {
151 info!(
152 "Initializing MessagingService with P2P NAT traversal (concurrency limit: {})",
153 concurrency_limit
154 );
155 }
156 Some(super::NatTraversalMode::ClientOnly) => {
157 info!("Initializing MessagingService with client-only NAT traversal");
158 }
159 None => {
160 warn!("Initializing MessagingService with NAT traversal disabled");
161 }
162 }
163
164 let store = MessageStore::new(dht_client.clone(), None).await?;
166
167 #[cfg(test)]
169 let network = Arc::new(crate::network::P2PNode::new_for_tests()?);
170
171 #[cfg(not(test))]
172 let network = {
173 let mut node_config = crate::network::NodeConfig::new()?;
175
176 match &config.port {
178 super::PortConfig::OsAssigned => {
179 let bind_addr = std::net::SocketAddr::new(
181 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
182 0, );
184 node_config.listen_addr = bind_addr;
185 node_config.listen_addrs = vec![bind_addr];
186 }
187 super::PortConfig::Explicit(port) => {
188 let bind_addr = match &config.ip_mode {
190 super::IpMode::IPv6Only => std::net::SocketAddr::new(
191 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
192 *port,
193 ),
194 _ => std::net::SocketAddr::new(
195 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
196 *port,
197 ),
198 };
199 node_config.listen_addr = bind_addr;
200 node_config.listen_addrs = vec![bind_addr];
201 }
202 super::PortConfig::Range(start, _end) => {
203 warn!(
206 "Port range configuration not fully supported yet, using port {}",
207 start
208 );
209 let bind_addr = std::net::SocketAddr::new(
210 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
211 *start,
212 );
213 node_config.listen_addr = bind_addr;
214 node_config.listen_addrs = vec![bind_addr];
215 }
216 }
217
218 match &config.ip_mode {
220 super::IpMode::IPv4Only => {
221 node_config.enable_ipv6 = false;
222 }
223 super::IpMode::IPv6Only => {
224 node_config.enable_ipv6 = true;
225 let port = node_config.listen_addr.port();
227 node_config.listen_addrs = vec![std::net::SocketAddr::new(
228 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
229 port,
230 )];
231 }
232 super::IpMode::DualStack => {
233 node_config.enable_ipv6 = true;
234 let port = node_config.listen_addr.port();
236 node_config.listen_addrs = vec![
237 std::net::SocketAddr::new(
238 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
239 port,
240 ),
241 std::net::SocketAddr::new(
242 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
243 port,
244 ),
245 ];
246 }
247 super::IpMode::DualStackSeparate {
248 ipv4_port,
249 ipv6_port,
250 } => {
251 node_config.enable_ipv6 = true;
253
254 let ipv4_port_num = match ipv4_port {
255 super::PortConfig::OsAssigned => 0,
256 super::PortConfig::Explicit(p) => *p,
257 super::PortConfig::Range(start, _) => *start,
258 };
259
260 let ipv6_port_num = match ipv6_port {
261 super::PortConfig::OsAssigned => 0,
262 super::PortConfig::Explicit(p) => *p,
263 super::PortConfig::Range(start, _) => *start,
264 };
265
266 node_config.listen_addrs = vec![
267 std::net::SocketAddr::new(
268 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
269 ipv4_port_num,
270 ),
271 std::net::SocketAddr::new(
272 std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED),
273 ipv6_port_num,
274 ),
275 ];
276
277 node_config.listen_addr = node_config.listen_addrs[0];
279 }
280 }
281
282 let node = crate::network::P2PNode::new(node_config).await?;
283 Arc::new(node)
284 };
285 let transport = Arc::new(MessageTransport::new(network, dht_client.clone()).await?);
286 let key_exchange = Arc::new(KeyExchange::new(identity.clone(), dht_client.clone()).await?);
287
288 let (event_tx, _) = broadcast::channel(1000);
289
290 Ok(Self {
291 identity,
292 store,
293 transport,
294 key_exchange,
295 _dht_client: dht_client,
296 event_tx,
297 online_users: Arc::new(RwLock::new(HashMap::new())),
298 })
299 }
300
301 pub async fn enable_restart_management(&self, restart_manager: Arc<RestartManager>) {
305 let handler = Arc::new(ControlMessageHandler::new(restart_manager));
306 let events = self.transport.network().subscribe_events();
307 handler.start(events).await;
308 info!("Restart management enabled for MessagingService");
309 }
310
311 pub async fn send_message(
313 &self,
314 recipients: Vec<FourWordAddress>,
315 content: MessageContent,
316 channel_id: ChannelId,
317 options: SendOptions,
318 ) -> Result<(MessageId, DeliveryReceipt)> {
319 let mut message = RichMessage::new(
321 UserHandle::from(self.identity.to_string()),
322 channel_id,
323 content,
324 );
325
326 message.ephemeral = options.ephemeral;
328 if let Some(seconds) = options.expiry_seconds {
329 message.expires_at = Some(Utc::now() + Duration::seconds(seconds as i64));
330 }
331 message.reply_to = options.reply_to;
332 message.thread_id = options.thread_id;
333 message.attachments = options.attachments;
334
335 self.store.store_message(&message).await?;
337
338 let mut delivery_results = Vec::new();
340
341 for recipient in &recipients {
342 let encryption_key = match self.key_exchange.get_session_key(recipient).await {
344 Ok(key) => key,
345 Err(_) => {
346 info!("No session key for {}, initiating key exchange", recipient);
348 let kex_msg = self
349 .key_exchange
350 .initiate_exchange(recipient.clone())
351 .await?;
352
353 self.transport
355 .send_key_exchange_message(recipient, kex_msg)
356 .await?;
357
358 let wait_result = tokio::time::timeout(
360 tokio::time::Duration::from_secs(5),
361 self.wait_for_session_key(recipient),
362 )
363 .await;
364
365 match wait_result {
366 Ok(Ok(key)) => {
367 info!("Key exchange completed for {}", recipient);
368 key
369 }
370 Ok(Err(e)) => {
371 return Err(anyhow::anyhow!(
372 "Key exchange failed for {}: {}",
373 recipient,
374 e
375 ));
376 }
377 Err(_) => {
378 return Err(anyhow::anyhow!("Key exchange timeout for {}", recipient));
379 }
380 }
381 }
382 };
383
384 let encrypted = self
386 .encrypt_message_with_key(&message, &encryption_key)
387 .await?;
388
389 match self
391 .transport
392 .send_message(&encrypted, vec![recipient.clone()])
393 .await
394 {
395 Ok(_receipt) => {
396 delivery_results.push((recipient.clone(), DeliveryStatus::Queued));
397 }
398 Err(e) => {
399 warn!("Failed to send to {}: {}", recipient, e);
400 delivery_results
401 .push((recipient.clone(), DeliveryStatus::Failed(e.to_string())));
402 }
403 }
404 }
405
406 let receipt = DeliveryReceipt {
408 message_id: message.id,
409 timestamp: Utc::now(),
410 delivery_status: delivery_results,
411 };
412
413 info!(
414 "Sent message {} to {} recipients",
415 message.id,
416 recipients.len()
417 );
418
419 Ok((message.id, receipt))
420 }
421
422 pub async fn send_message_to_channel(
424 &self,
425 channel_id: ChannelId,
426 content: MessageContent,
427 options: SendOptions,
428 ) -> Result<(MessageId, DeliveryReceipt)> {
429 let recipients = channel_recipients(&channel_id).await?;
431
432 if recipients.is_empty() {
433 return Err(anyhow::anyhow!(
434 "No recipients found for channel {}",
435 channel_id
436 ));
437 }
438
439 self.send_message(recipients, content, channel_id, options)
441 .await
442 }
443
444 pub async fn subscribe_messages(
446 &self,
447 channel_filter: Option<ChannelId>,
448 ) -> broadcast::Receiver<ReceivedMessage> {
449 let rx = self.event_tx.subscribe();
450
451 let transport = self.transport.clone();
453 let event_tx = self.event_tx.clone();
454 let key_exchange = self.key_exchange.clone();
455 let store = self.store.clone();
456
457 tokio::spawn(async move {
459 let mut receiver = transport.receive_messages().await;
460
461 while let Ok(received) = receiver.recv().await {
462 if let Ok(decrypted) =
464 Self::decrypt_received_message(&received.message, &key_exchange).await
465 {
466 let _ = store.store_message(&decrypted).await;
468
469 if let Some(filter) = channel_filter
471 && decrypted.channel_id != filter
472 {
473 continue;
474 }
475
476 let _ = event_tx.send(ReceivedMessage {
478 message: received.message,
479 received_at: received.received_at,
480 });
481 }
482 }
483 });
484
485 let transport_kex = self.transport.clone();
487 let key_exchange_kex = self.key_exchange.clone();
488 tokio::spawn(async move {
489 let mut kex_receiver = transport_kex.subscribe_key_exchange();
490
491 while let Ok(kex_msg) = kex_receiver.recv().await {
492 use super::key_exchange::KeyExchangeType;
493
494 match kex_msg.message_type {
495 KeyExchangeType::Initiation => {
496 info!("Received key exchange initiation from {}", kex_msg.sender);
498 match key_exchange_kex.respond_to_exchange(kex_msg).await {
499 Ok(response) => {
500 let recipient = response.recipient.clone();
502 if let Err(e) = transport_kex
503 .send_key_exchange_message(&recipient, response)
504 .await
505 {
506 warn!(
507 "Failed to send key exchange response to {}: {}",
508 recipient, e
509 );
510 }
511 }
512 Err(e) => {
513 warn!("Failed to respond to key exchange: {}", e);
514 }
515 }
516 }
517 KeyExchangeType::Response => {
518 info!("Received key exchange response from {}", kex_msg.sender);
520 if let Err(e) = key_exchange_kex.complete_exchange(kex_msg).await {
521 warn!("Failed to complete key exchange: {}", e);
522 }
523 }
524 }
525 }
526 });
527
528 rx
529 }
530
531 pub async fn get_message_status(&self, message_id: MessageId) -> Result<DeliveryStatus> {
533 if let Ok(_msg) = self.store.get_message(message_id).await {
538 let online = self.online_users.read().await;
540 if !online.is_empty() {
541 Ok(DeliveryStatus::Delivered(Utc::now()))
542 } else {
543 Ok(DeliveryStatus::Queued)
544 }
545 } else {
546 Ok(DeliveryStatus::Failed("Message not found".to_string()))
547 }
548 }
549
550 pub async fn get_message(&self, message_id: MessageId) -> Result<RichMessage> {
552 self.store.get_message(message_id).await
553 }
554
555 pub async fn get_channel_messages(
565 &self,
566 channel_id: ChannelId,
567 limit: usize,
568 before: Option<chrono::DateTime<chrono::Utc>>,
569 ) -> Result<Vec<RichMessage>> {
570 self.store
571 .get_channel_messages(channel_id, limit, before)
572 .await
573 }
574
575 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
583 self.store.get_thread_messages(thread_id).await
584 }
585
586 pub async fn mark_user_online(&self, user: FourWordAddress) -> Result<()> {
588 let mut online = self.online_users.write().await;
589 online.insert(user, Utc::now());
590 Ok(())
591 }
592
593 pub async fn mark_delivered(
595 &self,
596 message_id: MessageId,
597 recipient: FourWordAddress,
598 ) -> Result<()> {
599 if let Ok(mut msg) = self.store.get_message(message_id).await {
601 msg.delivered_to.insert(
602 crate::messaging::user_resolver::resolve_handle(&recipient),
603 Utc::now(),
604 );
605 self.store.update_message(&msg).await?;
606 }
607 Ok(())
608 }
609
610 pub async fn process_message_queue(&self) -> Result<()> {
612 self.transport.process_message_queue().await;
614 Ok(())
615 }
616
617 pub async fn encrypt_message(
619 &self,
620 recipient: FourWordAddress,
621 channel_id: ChannelId,
622 content: MessageContent,
623 ) -> Result<EncryptedMessage> {
624 let message = RichMessage::new(
625 UserHandle::from(self.identity.to_string()),
626 channel_id,
627 content,
628 );
629
630 let key = self
632 .key_exchange
633 .get_session_key(&recipient)
634 .await
635 .unwrap_or_else(|_| vec![0u8; 32]); self.encrypt_message_with_key(&message, &key).await
638 }
639
640 pub async fn decrypt_message(&self, encrypted: EncryptedMessage) -> Result<RichMessage> {
642 Self::decrypt_received_message(&encrypted, &self.key_exchange).await
643 }
644
645 async fn encrypt_message_with_key(
647 &self,
648 message: &RichMessage,
649 key: &[u8],
650 ) -> Result<EncryptedMessage> {
651 use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
652
653 let plaintext = serde_json::to_vec(message)?;
654 let mut k = [0u8; 32];
655 if key.len() != 32 {
656 return Err(anyhow::anyhow!("Invalid session key length"));
657 }
658 k.copy_from_slice(&key[..32]);
659 let sk = SymmetricKey::from_bytes(k);
660 let cipher = ChaCha20Poly1305Cipher::new(&sk);
661 let (ciphertext, nonce) = cipher
662 .encrypt(&plaintext, None)
663 .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
664
665 Ok(EncryptedMessage {
666 id: message.id,
667 channel_id: message.channel_id,
668 sender: self.identity.clone(),
669 ciphertext,
670 nonce: nonce.to_vec(),
671 key_id: format!("key_{}", self.identity),
672 })
673 }
674
675 async fn decrypt_received_message(
677 encrypted: &EncryptedMessage,
678 key_exchange: &Arc<KeyExchange>,
679 ) -> Result<RichMessage> {
680 use saorsa_pqc::{ChaCha20Poly1305Cipher, SymmetricKey};
681
682 let key = key_exchange
684 .get_session_key(&encrypted.sender)
685 .await
686 .map_err(|e| anyhow::anyhow!("No session key for {}: {}", encrypted.sender, e))?;
687 if key.len() != 32 {
688 return Err(anyhow::anyhow!("Invalid session key length"));
689 }
690 let mut k = [0u8; 32];
691 k.copy_from_slice(&key[..32]);
692 let sk = SymmetricKey::from_bytes(k);
693 let cipher = ChaCha20Poly1305Cipher::new(&sk);
694 if encrypted.nonce.len() != 12 {
696 return Err(anyhow::anyhow!(
697 "Invalid nonce length: expected 12, got {}",
698 encrypted.nonce.len()
699 ));
700 }
701 let mut nonce_array = [0u8; 12];
702 nonce_array.copy_from_slice(&encrypted.nonce);
703
704 let plaintext = cipher
705 .decrypt(&encrypted.ciphertext, &nonce_array, None)
706 .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
707
708 let message: RichMessage = serde_json::from_slice(&plaintext)?;
710
711 Ok(message)
712 }
713
714 pub async fn listen_addrs(&self) -> Vec<std::net::SocketAddr> {
718 self.transport.listen_addrs().await
719 }
720
721 pub async fn connected_peers(&self) -> Vec<String> {
723 self.transport
724 .connected_peers()
725 .await
726 .into_iter()
727 .map(|peer_id| peer_id.to_string())
728 .collect()
729 }
730
731 pub async fn peer_count(&self) -> usize {
733 self.transport.peer_count().await
734 }
735
736 pub async fn is_running(&self) -> bool {
738 true
741 }
742
743 pub async fn connect_peer(&self, address: &str) -> Result<String> {
751 let peer_id = self.transport.connect_peer(address).await?;
752 Ok(peer_id.to_string())
753 }
754
755 pub async fn disconnect_peer(&self, peer_id: &str) -> Result<()> {
760 let peer_id_parsed = peer_id
762 .parse()
763 .map_err(|e| anyhow::anyhow!("Invalid peer ID: {}", e))?;
764 self.transport.disconnect_peer(&peer_id_parsed).await
765 }
766
767 async fn wait_for_session_key(&self, peer: &FourWordAddress) -> Result<Vec<u8>> {
772 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
774 let max_attempts = 50; for _ in 0..max_attempts {
777 interval.tick().await;
778
779 if let Ok(key) = self.key_exchange.get_session_key(peer).await {
780 return Ok(key);
781 }
782 }
783
784 Err(anyhow::anyhow!(
785 "Session key not established within timeout"
786 ))
787 }
788
789 #[cfg(test)]
791 pub fn create_test_message(
792 &self,
793 sender: UserHandle,
794 channel_id: ChannelId,
795 content: MessageContent,
796 ) -> RichMessage {
797 RichMessage::new(sender, channel_id, content)
798 }
799
800 #[cfg(test)]
801 pub async fn inject_test_message(&self, message: RichMessage) -> Result<()> {
802 self.store.store_message(&message).await?;
803
804 let encrypted = EncryptedMessage {
806 id: message.id,
807 channel_id: message.channel_id,
808 sender: self.identity.clone(),
809 ciphertext: vec![],
810 nonce: vec![],
811 key_id: "test".to_string(),
812 };
813
814 let _ = self.event_tx.send(ReceivedMessage {
815 message: encrypted,
816 received_at: Utc::now(),
817 });
818
819 Ok(())
820 }
821}
822
823