1use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::SystemTime;
16
17use anyhow::{anyhow, Context, Result};
18use bytes::Bytes;
19use pim_core::NodeId;
20use pim_crypto::{e2e_decrypt_in_place, e2e_encrypt};
21use pim_plugin::{ControlSender, IdentitySecrets, PeerDirectory};
22use pim_protocol::ControlFrame;
23use serde::{Deserialize, Serialize};
24use tokio::sync::broadcast;
25use tracing::{debug, warn};
26
27use crate::storage::{
28 AckKind, ConversationSummary, MessageDirection, MessageRecord, MessageStatus, MessagingStorage,
29};
30use crate::wire::{decode_ack, decode_message, encode_ack, encode_message, KIND_ACK, KIND_MESSAGE};
31use crate::{hex16, hex_node_id};
32
33pub const MAX_BODY_BYTES: usize = 8 * 1024;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39#[serde(rename_all = "snake_case")]
40pub enum HistoryScope {
41 Peer,
43 All,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(tag = "kind", rename_all = "snake_case")]
51pub enum MessageEvent {
52 MessageReceived {
54 message: Box<MessageRecord>,
56 conversation: Box<ConversationSummary>,
58 },
59 MessageStatus {
62 message_id: String,
64 peer_node_id: String,
66 new_status: MessageStatus,
68 at_ms: i64,
70 },
71 HistoryCleared {
74 peer_node_id: Option<String>,
76 scope: HistoryScope,
78 deleted_messages: i64,
80 },
81}
82
83pub struct MessagingService {
85 storage: Arc<MessagingStorage>,
86 events_tx: broadcast::Sender<MessageEvent>,
87 peers: Arc<dyn PeerDirectory>,
88 control: Arc<dyn ControlSender>,
89 identity: Arc<dyn IdentitySecrets>,
90}
91
92impl MessagingService {
93 pub fn open(
95 db_path: PathBuf,
96 peers: Arc<dyn PeerDirectory>,
97 control: Arc<dyn ControlSender>,
98 identity: Arc<dyn IdentitySecrets>,
99 ) -> Result<Self> {
100 let storage = Arc::new(MessagingStorage::open(db_path)?);
101 let (events_tx, _rx) = broadcast::channel(256);
102 Ok(Self {
103 storage,
104 events_tx,
105 peers,
106 control,
107 identity,
108 })
109 }
110
111 pub fn subscribe(&self) -> broadcast::Receiver<MessageEvent> {
114 self.events_tx.subscribe()
115 }
116
117 pub fn storage(&self) -> &Arc<MessagingStorage> {
120 &self.storage
121 }
122
123 pub async fn list_conversations(&self) -> Result<Vec<ConversationSummary>> {
127 let storage = self.storage.clone();
128 let mut rows = tokio::task::spawn_blocking(move || storage.list_conversations_raw())
129 .await
130 .context("storage join")??;
131 for row in rows.iter_mut() {
132 let peer = match parse_node_id(&row.peer_node_id) {
133 Some(p) => p,
134 None => continue,
135 };
136 if let Some(name) = self.peers.lookup_name(&peer).await {
137 if !name.is_empty() {
138 row.name = name;
139 }
140 }
141 if let Some(x25519) = self.peers.lookup_x25519(&peer).await {
142 row.x25519_pubkey = Some(hex32(&x25519));
143 }
144 }
145 Ok(rows)
146 }
147
148 pub async fn record_local_send(
150 &self,
151 peer: NodeId,
152 message_id: [u8; 16],
153 body: String,
154 timestamp_ms: i64,
155 ) -> Result<MessageRecord> {
156 let storage = self.storage.clone();
157 let peer_id_hex = hex_node_id(&peer);
158 let message_id_hex = hex16(&message_id);
159
160 let record = MessageRecord {
161 id: message_id_hex.clone(),
162 peer_node_id: peer_id_hex.clone(),
163 direction: MessageDirection::Sent,
164 body,
165 timestamp_ms,
166 status: MessageStatus::Pending,
167 failure_reason: None,
168 delivered_at_ms: None,
169 read_at_ms: None,
170 };
171 let record_clone = record.clone();
172 tokio::task::spawn_blocking(move || -> Result<()> {
173 storage.insert_message(&record_clone)?;
174 storage.bump_conversation_after_local_send(
175 &peer_id_hex,
176 &message_id_hex,
177 timestamp_ms,
178 &record_clone.body,
179 )?;
180 Ok(())
181 })
182 .await??;
183
184 Ok(record)
185 }
186
187 pub async fn mark_sent(&self, peer: NodeId, message_id: [u8; 16], at_ms: i64) -> Result<()> {
189 self.set_status(peer, message_id, MessageStatus::Sent, None, None, at_ms)
190 .await
191 }
192
193 pub async fn mark_delivered(
195 &self,
196 peer: NodeId,
197 message_id: [u8; 16],
198 at_ms: i64,
199 ) -> Result<()> {
200 self.set_status(
201 peer,
202 message_id,
203 MessageStatus::Delivered,
204 Some(at_ms),
205 None,
206 at_ms,
207 )
208 .await
209 }
210
211 pub async fn mark_read(&self, peer: NodeId, message_id: [u8; 16], at_ms: i64) -> Result<()> {
213 self.set_status(
214 peer,
215 message_id,
216 MessageStatus::Read,
217 None,
218 Some(at_ms),
219 at_ms,
220 )
221 .await
222 }
223
224 pub async fn mark_failed(
226 &self,
227 peer: NodeId,
228 message_id: [u8; 16],
229 reason: String,
230 at_ms: i64,
231 ) -> Result<()> {
232 let storage = self.storage.clone();
233 let id_hex = hex16(&message_id);
234 let peer_hex = hex_node_id(&peer);
235 let storage_id = id_hex.clone();
236 let storage_reason = reason.clone();
237 tokio::task::spawn_blocking(move || -> Result<()> {
238 storage.set_message_failed(&storage_id, &storage_reason, at_ms)
239 })
240 .await??;
241
242 let _ = self.events_tx.send(MessageEvent::MessageStatus {
243 message_id: id_hex,
244 peer_node_id: peer_hex,
245 new_status: MessageStatus::Failed,
246 at_ms,
247 });
248 Ok(())
249 }
250
251 async fn set_status(
252 &self,
253 peer: NodeId,
254 message_id: [u8; 16],
255 status: MessageStatus,
256 delivered_at_ms: Option<i64>,
257 read_at_ms: Option<i64>,
258 at_ms: i64,
259 ) -> Result<()> {
260 let storage = self.storage.clone();
261 let id_hex = hex16(&message_id);
262 let peer_hex = hex_node_id(&peer);
263 let storage_id = id_hex.clone();
264 tokio::task::spawn_blocking(move || -> Result<()> {
265 storage.set_message_status(&storage_id, status, delivered_at_ms, read_at_ms)
266 })
267 .await??;
268
269 let _ = self.events_tx.send(MessageEvent::MessageStatus {
270 message_id: id_hex,
271 peer_node_id: peer_hex,
272 new_status: status,
273 at_ms,
274 });
275 Ok(())
276 }
277
278 pub async fn delete_conversation(&self, peer: NodeId) -> Result<(usize, bool)> {
280 let storage = self.storage.clone();
281 let peer_hex = hex_node_id(&peer);
282 let peer_hex_for_storage = peer_hex.clone();
283 let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, bool)> {
284 storage.delete_conversation(&peer_hex_for_storage)
285 })
286 .await??;
287 let _ = self.events_tx.send(MessageEvent::HistoryCleared {
288 peer_node_id: Some(peer_hex),
289 scope: HistoryScope::Peer,
290 deleted_messages: outcome.0 as i64,
291 });
292 Ok(outcome)
293 }
294
295 pub async fn delete_all_messages(&self) -> Result<(usize, usize)> {
297 let storage = self.storage.clone();
298 let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, usize)> {
299 storage.delete_all_messages()
300 })
301 .await??;
302 let _ = self.events_tx.send(MessageEvent::HistoryCleared {
303 peer_node_id: None,
304 scope: HistoryScope::All,
305 deleted_messages: outcome.0 as i64,
306 });
307 Ok(outcome)
308 }
309
310 pub async fn send(&self, peer: NodeId, body: String) -> Result<MessageRecord> {
315 if body.len() > MAX_BODY_BYTES {
316 return Err(anyhow!("message body exceeds {MAX_BODY_BYTES} bytes"));
317 }
318
319 let recipient_x25519 = match self.peers.lookup_x25519(&peer).await {
320 Some(k) => k,
321 None => {
322 return Err(anyhow!(
323 "no x25519 public key cached for {peer}; wait until peer comes online and re-issues PeerInfo"
324 ))
325 }
326 };
327
328 let mut id_bytes = [0u8; 16];
329 id_bytes.copy_from_slice(uuid::Uuid::new_v4().as_bytes());
330 let timestamp_ms = now_ms();
331
332 let record = self
333 .record_local_send(peer, id_bytes, body.clone(), timestamp_ms)
334 .await?;
335
336 let ciphertext = e2e_encrypt(body.as_bytes(), &recipient_x25519)
337 .map_err(|e| anyhow!("e2e_encrypt failed: {e}"))?;
338
339 let body_bytes = encode_message(id_bytes, timestamp_ms as u64, &ciphertext);
340 let frame = ControlFrame::PluginPayload {
341 kind: KIND_MESSAGE.into(),
342 body: body_bytes,
343 };
344
345 let sent = self.control.send_routed(peer, frame).await;
346 if sent {
347 let _ = self.mark_sent(peer, id_bytes, now_ms()).await;
348 } else {
349 let _ = self
350 .mark_failed(peer, id_bytes, "no_route".into(), now_ms())
351 .await;
352 }
353
354 Ok(record)
355 }
356
357 pub async fn handle_incoming_message(&self, src: NodeId, body: Bytes) {
359 let decoded = match decode_message(&body) {
360 Ok(d) => d,
361 Err(e) => {
362 warn!(%src, "messaging: malformed messaging.msg: {e}");
363 return;
364 }
365 };
366
367 let identity_seed = self.identity.signing_seed();
368 let mut buffer = decoded.ciphertext;
369 if let Err(e) = e2e_decrypt_in_place(&mut buffer, &identity_seed) {
370 warn!(%src, "messaging: ECIES decrypt failed: {e}");
371 return;
372 }
373 let plaintext = match String::from_utf8(buffer) {
374 Ok(s) => s,
375 Err(_) => {
376 warn!(%src, "messaging: payload not valid UTF-8");
377 return;
378 }
379 };
380
381 let received_at = now_ms();
382 let cached_name = self.peers.lookup_name(&src).await;
383 let cached_x25519 = self.peers.lookup_x25519(&src).await;
384
385 let storage = self.storage.clone();
386 let peer_id_hex = hex_node_id(&src);
387 let message_id_hex = hex16(&decoded.message_id);
388 let timestamp_ms = decoded.timestamp_ms as i64;
389
390 let record = MessageRecord {
391 id: message_id_hex.clone(),
392 peer_node_id: peer_id_hex.clone(),
393 direction: MessageDirection::Received,
394 body: plaintext.clone(),
395 timestamp_ms,
396 status: MessageStatus::Delivered,
397 failure_reason: None,
398 delivered_at_ms: Some(received_at),
399 read_at_ms: None,
400 };
401 let record_clone = record.clone();
402 let preview_source = plaintext;
403
404 let bump_result =
405 tokio::task::spawn_blocking(move || -> Result<crate::storage::ConversationBump> {
406 storage.insert_message(&record_clone)?;
407 storage.bump_conversation_after_remote_receive(
408 &peer_id_hex,
409 &message_id_hex,
410 timestamp_ms,
411 &preview_source,
412 )
413 })
414 .await;
415
416 let bump = match bump_result {
417 Ok(Ok(b)) => b,
418 Ok(Err(e)) => {
419 warn!(%src, "messaging: storage write failed: {e}");
420 return;
421 }
422 Err(e) => {
423 warn!(%src, "messaging: storage join failed: {e}");
424 return;
425 }
426 };
427
428 let peer_hex = hex_node_id(&src);
429 let short = crate::storage::short_id(&peer_hex);
430 let conversation = ConversationSummary {
431 peer_node_id: peer_hex,
432 peer_node_id_short: short.clone(),
433 name: cached_name
434 .filter(|n| !n.is_empty())
435 .unwrap_or_else(|| short.clone()),
436 last_message_preview: Some(bump.preview),
437 last_message_ts_ms: Some(timestamp_ms),
438 unread_count: bump.unread_count,
439 x25519_pubkey: cached_x25519.map(|k| hex32(&k)),
440 };
441
442 let _ = self.events_tx.send(MessageEvent::MessageReceived {
443 message: Box::new(record),
444 conversation: Box::new(conversation),
445 });
446
447 let ack = ControlFrame::PluginPayload {
449 kind: KIND_ACK.into(),
450 body: encode_ack(decoded.message_id, AckKind::Delivered as u8),
451 };
452 let _ = self.control.send_routed(src, ack).await;
453 debug!(%src, "messaging: stored received + acked delivered");
454 }
455
456 pub async fn handle_incoming_message_ack(&self, src: NodeId, body: Bytes) {
458 let decoded = match decode_ack(&body) {
459 Ok(d) => d,
460 Err(e) => {
461 warn!(%src, "messaging: malformed messaging.ack: {e}");
462 return;
463 }
464 };
465 let kind = match AckKind::from_u8(decoded.ack_kind) {
466 Some(k) => k,
467 None => {
468 warn!(%src, "messaging: ignoring ack with unknown kind {}", decoded.ack_kind);
469 return;
470 }
471 };
472 let now = now_ms();
473 match kind {
474 AckKind::Delivered => {
475 let _ = self.mark_delivered(src, decoded.message_id, now).await;
476 }
477 AckKind::Read => {
478 let _ = self.mark_read(src, decoded.message_id, now).await;
479 }
480 }
481 }
482
483 pub async fn on_peer_forgotten(&self, peer: NodeId) {
487 let storage = self.storage.clone();
488 let peer_hex = hex_node_id(&peer);
489 let peer_hex_for_storage = peer_hex.clone();
490 let outcome = tokio::task::spawn_blocking(move || -> Result<(usize, bool)> {
491 storage.delete_conversation(&peer_hex_for_storage)
492 })
493 .await;
494
495 let outcome = match outcome {
496 Ok(Ok(o)) => o,
497 Ok(Err(e)) => {
498 warn!(%peer, "messaging: on_peer_forgotten storage failed: {e}");
499 return;
500 }
501 Err(e) => {
502 warn!(%peer, "messaging: on_peer_forgotten join failed: {e}");
503 return;
504 }
505 };
506
507 if outcome.0 > 0 || outcome.1 {
508 let _ = self.events_tx.send(MessageEvent::HistoryCleared {
509 peer_node_id: Some(peer_hex),
510 scope: HistoryScope::Peer,
511 deleted_messages: outcome.0 as i64,
512 });
513 }
514 }
515}
516
517pub fn now_ms() -> i64 {
520 SystemTime::now()
521 .duration_since(SystemTime::UNIX_EPOCH)
522 .map(|d| d.as_millis() as i64)
523 .unwrap_or(0)
524}
525
526fn hex32(bytes: &[u8; 32]) -> String {
527 let mut out = String::with_capacity(64);
528 for b in bytes {
529 out.push_str(&format!("{b:02x}"));
530 }
531 out
532}
533
534fn parse_node_id(hex: &str) -> Option<NodeId> {
535 if hex.len() != 32 {
536 return None;
537 }
538 let mut bytes = [0u8; 16];
539 for i in 0..16 {
540 let pair = &hex[i * 2..i * 2 + 2];
541 bytes[i] = u8::from_str_radix(pair, 16).ok()?;
542 }
543 Some(NodeId::from_bytes(bytes))
544}