1use std::path::Path;
2use std::sync::{Arc, Mutex};
3
4use nostr::{Event, Filter, Keys, PublicKey, UnsignedEvent};
5use rusqlite::{params, Connection, OptionalExtension};
6
7use crate::{
8 is_app_keys_event, AppKeys, ProtocolDecryptedMessage, ProtocolEffect, ProtocolEngine,
9 ProtocolRetryBatch, SharedConnection, SqliteStorageAdapter, UnixSeconds, APP_KEYS_EVENT_KIND,
10 CHAT_MESSAGE_KIND, INVITE_EVENT_KIND, INVITE_RESPONSE_KIND, MESSAGE_EVENT_KIND,
11};
12
13const SCHEMA: &str = r#"
14CREATE TABLE IF NOT EXISTS private_chat_threads (
15 chat_id TEXT PRIMARY KEY,
16 display_name TEXT NOT NULL,
17 avatar_seed TEXT NOT NULL,
18 updated_at_secs INTEGER NOT NULL DEFAULT 0
19);
20
21CREATE TABLE IF NOT EXISTS private_chat_messages (
22 chat_id TEXT NOT NULL,
23 id TEXT NOT NULL,
24 body TEXT NOT NULL,
25 is_outgoing INTEGER NOT NULL,
26 created_at_secs INTEGER NOT NULL,
27 delivery TEXT NOT NULL,
28 source_event_id TEXT,
29 PRIMARY KEY (chat_id, id)
30);
31
32CREATE INDEX IF NOT EXISTS private_chat_recent_idx
33 ON private_chat_messages(chat_id, created_at_secs, id);
34
35CREATE UNIQUE INDEX IF NOT EXISTS private_chat_source_event_idx
36 ON private_chat_messages(source_event_id)
37 WHERE source_event_id IS NOT NULL;
38
39CREATE TABLE IF NOT EXISTS private_chat_seen_events (
40 event_id TEXT PRIMARY KEY
41);
42
43CREATE TABLE IF NOT EXISTS ndr_kv (
44 owner_pubkey_hex TEXT NOT NULL,
45 device_pubkey_hex TEXT NOT NULL,
46 key TEXT NOT NULL,
47 value TEXT NOT NULL,
48 PRIMARY KEY (owner_pubkey_hex, device_pubkey_hex, key)
49);
50"#;
51
52#[derive(Clone, Debug, PartialEq, Eq)]
53pub enum DirectMessageDelivery {
54 Pending,
55 Sent,
56 Received,
57 Failed,
58}
59
60impl DirectMessageDelivery {
61 fn as_str(&self) -> &'static str {
62 match self {
63 Self::Pending => "pending",
64 Self::Sent => "sent",
65 Self::Received => "received",
66 Self::Failed => "failed",
67 }
68 }
69
70 fn from_str(value: &str) -> Self {
71 match value {
72 "sent" => Self::Sent,
73 "received" => Self::Received,
74 "failed" => Self::Failed,
75 _ => Self::Pending,
76 }
77 }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct DirectMessageSnapshot {
82 pub id: String,
83 pub chat_id: String,
84 pub body: String,
85 pub is_outgoing: bool,
86 pub created_at_secs: u64,
87 pub delivery: DirectMessageDelivery,
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub struct DirectChatSnapshot {
92 pub chat_id: String,
93 pub last_message_preview: String,
94 pub last_message_at: u64,
95 pub unread_count: u32,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq)]
99pub struct DirectThreadSnapshot {
100 pub chat: DirectChatSnapshot,
101 pub messages: Vec<DirectMessageSnapshot>,
102}
103
104#[derive(Clone, Debug)]
105pub enum DirectMessageCommand {
106 Publish(Event),
107 Subscribe {
108 subscription_id: String,
109 filters: Vec<Filter>,
110 durable: bool,
111 },
112}
113
114pub struct DirectMessageService {
115 conn: SharedConnection,
116 protocol_engine: Option<ProtocolEngine>,
117 owner_public_key: Option<PublicKey>,
118 relay_subscription_key: Option<String>,
119 fetch_subscription_counter: u64,
120 last_error: Option<String>,
121}
122
123impl DirectMessageService {
124 pub fn memory() -> Self {
125 let service = Self {
126 conn: Arc::new(Mutex::new(Connection::open_in_memory().unwrap())),
127 protocol_engine: None,
128 owner_public_key: None,
129 relay_subscription_key: None,
130 fetch_subscription_counter: 0,
131 last_error: None,
132 };
133 service.ensure_schema();
134 service
135 }
136
137 pub fn open(data_dir: &Path, owner_keys: Option<&Keys>) -> Self {
138 let path = data_dir.join("private-chat.sqlite3");
139 let conn = Connection::open(path).or_else(|_| Connection::open_in_memory());
140 let conn = match conn {
141 Ok(conn) => conn,
142 Err(error) => {
143 return Self {
144 conn: Arc::new(Mutex::new(Connection::open_in_memory().unwrap())),
145 protocol_engine: None,
146 owner_public_key: None,
147 relay_subscription_key: None,
148 fetch_subscription_counter: 0,
149 last_error: Some(format!("Direct message store open failed: {error}")),
150 };
151 }
152 };
153 let service = Self {
154 conn: Arc::new(Mutex::new(conn)),
155 protocol_engine: None,
156 owner_public_key: None,
157 relay_subscription_key: None,
158 fetch_subscription_counter: 0,
159 last_error: None,
160 };
161 service.ensure_schema();
162 if let Some(keys) = owner_keys {
163 service.with_protocol_engine(keys)
164 } else {
165 service
166 }
167 }
168
169 pub fn activate(&mut self, keys: &Keys) -> Vec<DirectMessageCommand> {
170 let next = Self {
171 conn: Arc::clone(&self.conn),
172 protocol_engine: None,
173 owner_public_key: None,
174 relay_subscription_key: self.relay_subscription_key.clone(),
175 fetch_subscription_counter: self.fetch_subscription_counter,
176 last_error: self.last_error.clone(),
177 }
178 .with_protocol_engine(keys);
179 self.protocol_engine = next.protocol_engine;
180 self.owner_public_key = next.owner_public_key;
181 self.protocol_subscription_commands()
182 }
183
184 pub fn last_error(&self) -> Option<String> {
185 self.last_error.clone()
186 }
187
188 pub fn chats(&self) -> Vec<DirectChatSnapshot> {
189 let Ok(conn) = self.conn.lock() else {
190 return Vec::new();
191 };
192 let mut stmt = match conn.prepare(
193 "SELECT t.chat_id,
194 COALESCE(m.body, ''), COALESCE(m.created_at_secs, t.updated_at_secs)
195 FROM private_chat_threads t
196 LEFT JOIN private_chat_messages m
197 ON m.chat_id = t.chat_id
198 AND m.created_at_secs = (
199 SELECT MAX(created_at_secs)
200 FROM private_chat_messages
201 WHERE chat_id = t.chat_id
202 )
203 ORDER BY COALESCE(m.created_at_secs, t.updated_at_secs) DESC, t.chat_id ASC",
204 ) {
205 Ok(stmt) => stmt,
206 Err(_) => return Vec::new(),
207 };
208 let rows = match stmt.query_map([], |row| {
209 Ok(DirectChatSnapshot {
210 chat_id: row.get(0)?,
211 last_message_preview: row.get(1)?,
212 last_message_at: row.get::<_, i64>(2)?.max(0) as u64,
213 unread_count: 0,
214 })
215 }) {
216 Ok(rows) => rows,
217 Err(_) => return Vec::new(),
218 };
219 rows.filter_map(Result::ok).collect()
220 }
221
222 pub fn thread(&self, chat_id: &str) -> Option<DirectThreadSnapshot> {
223 let chat_id = normalize_pubkey(chat_id).ok()?;
224 let chat = self
225 .chats()
226 .into_iter()
227 .find(|chat| chat.chat_id == chat_id)
228 .unwrap_or_else(|| chat_snapshot_for_pubkey(&chat_id));
229 let messages = self.messages(&chat_id, 160);
230 Some(DirectThreadSnapshot { chat, messages })
231 }
232
233 pub fn open_chat(
234 &mut self,
235 peer_input: &str,
236 _keys: &Keys,
237 ) -> Result<(DirectThreadSnapshot, Vec<DirectMessageCommand>), String> {
238 let public_key = PublicKey::parse(peer_input).map_err(|error| error.to_string())?;
239 let chat_id = public_key.to_hex();
240 self.ensure_thread(&chat_id, unix_now());
241 let commands = self.protocol_subscription_commands();
242 let thread = self
243 .thread(&chat_id)
244 .ok_or_else(|| "Chat open failed".to_string())?;
245 Ok((thread, commands))
246 }
247
248 pub fn send_message(
249 &mut self,
250 chat_id: &str,
251 body: &str,
252 _keys: &Keys,
253 ) -> Result<Vec<DirectMessageCommand>, String> {
254 let body = body.trim();
255 if body.is_empty() {
256 return Ok(Vec::new());
257 }
258 let public_key = PublicKey::parse(chat_id).map_err(|error| error.to_string())?;
259 let chat_id = public_key.to_hex();
260 self.ensure_thread(&chat_id, unix_now());
261 let engine = self
262 .protocol_engine
263 .as_mut()
264 .ok_or_else(|| "Direct message runtime is not ready".to_string())?;
265 let result = engine
266 .send_direct_text(public_key, &chat_id, body, None, UnixSeconds(unix_now()))
267 .map_err(|error| error.to_string())?;
268 let delivery = if result.event_ids.is_empty() {
269 DirectMessageDelivery::Pending
270 } else {
271 DirectMessageDelivery::Sent
272 };
273 self.insert_message(
274 &chat_id,
275 &result.message_id,
276 body,
277 true,
278 unix_now(),
279 delivery,
280 None,
281 );
282 Ok(self.commands_from_effects(result.effects))
283 }
284
285 pub fn process_event(&mut self, event: Event, _keys: &Keys) -> Vec<DirectMessageCommand> {
286 let event_id = event.id.to_hex();
287 if self.seen_event(&event_id) {
288 return Vec::new();
289 }
290 let Some(engine) = self.protocol_engine.as_mut() else {
291 return Vec::new();
292 };
293 let kind = event.kind.as_u16() as u32;
294 let mut effects = Vec::new();
295 let mut retry_batch = ProtocolRetryBatch::default();
296 let mut decrypted = None;
297
298 let processed = match kind {
299 APP_KEYS_EVENT_KIND if is_app_keys_event(&event) => match AppKeys::from_event(&event) {
300 Ok(app_keys) => match engine.ingest_app_keys_snapshot(
301 event.pubkey,
302 app_keys,
303 event.created_at.as_secs(),
304 ) {
305 Ok(batch) => {
306 retry_batch = batch;
307 true
308 }
309 Err(error) => {
310 self.last_error = Some(format!("Direct message app keys failed: {error}"));
311 false
312 }
313 },
314 Err(_) => false,
315 },
316 INVITE_EVENT_KIND => match engine.observe_invite_event(&event) {
317 Ok(batch) => {
318 retry_batch = batch;
319 true
320 }
321 Err(_) => false,
322 },
323 INVITE_RESPONSE_KIND => match engine.observe_invite_response_event(&event) {
324 Ok(batch) => {
325 retry_batch = batch;
326 true
327 }
328 Err(_) => false,
329 },
330 MESSAGE_EVENT_KIND => match engine.process_direct_message_event(&event) {
331 Ok(message) => {
332 decrypted = message;
333 true
334 }
335 Err(_) => false,
336 },
337 _ => false,
338 };
339
340 if !processed {
341 return Vec::new();
342 }
343 self.mark_seen_event(&event_id);
344 if let Some(message) = decrypted {
345 self.apply_decrypted_protocol_message(message);
346 }
347 effects.extend(self.effects_from_retry_batch(retry_batch));
348 self.commands_from_effects(effects)
349 }
350
351 pub fn mobile_push_message_author_pubkeys(&self) -> Vec<String> {
352 let Some(engine) = self.protocol_engine.as_ref() else {
353 return Vec::new();
354 };
355 let mut authors = engine
356 .known_message_author_pubkeys()
357 .into_iter()
358 .map(|pubkey| pubkey.to_hex())
359 .collect::<Vec<_>>();
360 authors.sort();
361 authors.dedup();
362 authors
363 }
364
365 fn subscription_command(&mut self) -> Option<DirectMessageCommand> {
366 let engine = self.protocol_engine.as_ref()?;
367 let authors = engine
368 .known_message_author_pubkeys()
369 .into_iter()
370 .chain(self.owner_public_key)
371 .collect::<Vec<_>>();
372 let mut author_hexes = authors.iter().map(PublicKey::to_hex).collect::<Vec<_>>();
373 author_hexes.sort();
374 author_hexes.dedup();
375 let key = author_hexes.join(",");
376 if key.is_empty() || self.relay_subscription_key.as_deref() == Some(key.as_str()) {
377 return None;
378 }
379 self.relay_subscription_key = Some(key);
380
381 let public_keys = author_hexes
382 .iter()
383 .filter_map(|hex| PublicKey::parse(hex).ok())
384 .collect::<Vec<_>>();
385 let filter = Filter::new()
386 .authors(public_keys)
387 .kinds([
388 nostr::Kind::from(MESSAGE_EVENT_KIND as u16),
389 nostr::Kind::from(INVITE_EVENT_KIND as u16),
390 nostr::Kind::from(INVITE_RESPONSE_KIND as u16),
391 nostr::Kind::from(APP_KEYS_EVENT_KIND as u16),
392 ])
393 .limit(500);
394 Some(DirectMessageCommand::Subscribe {
395 subscription_id: "iris-native-private-chat".to_string(),
396 filters: vec![filter],
397 durable: true,
398 })
399 }
400
401 fn with_protocol_engine(mut self, keys: &Keys) -> Self {
402 let owner = keys.public_key();
403 let owner_hex = owner.to_hex();
404 let storage = Arc::new(SqliteStorageAdapter::new(
405 Arc::clone(&self.conn),
406 owner_hex.clone(),
407 owner_hex,
408 ));
409 match ProtocolEngine::load_or_create_for_local_device(storage, owner, keys) {
410 Ok(engine) => {
411 self.protocol_engine = Some(engine);
412 self.owner_public_key = Some(owner);
413 }
414 Err(error) => self.last_error = Some(format!("Direct message init failed: {error}")),
415 }
416 self
417 }
418
419 fn protocol_subscription_commands(&mut self) -> Vec<DirectMessageCommand> {
420 self.subscription_command().into_iter().collect()
421 }
422
423 fn commands_from_effects(&mut self, effects: Vec<ProtocolEffect>) -> Vec<DirectMessageCommand> {
424 let mut commands = Vec::new();
425 for effect in effects {
426 match effect {
427 ProtocolEffect::Publish(publish) => {
428 commands.push(DirectMessageCommand::Publish(publish.event));
429 }
430 ProtocolEffect::FetchProtocolState { filters, reason } => {
431 self.fetch_subscription_counter =
432 self.fetch_subscription_counter.saturating_add(1);
433 let subscription_id = format!(
434 "iris-native-private-chat-fetch-{reason}-{}",
435 self.fetch_subscription_counter
436 );
437 commands.push(DirectMessageCommand::Subscribe {
438 subscription_id,
439 filters,
440 durable: false,
441 });
442 }
443 }
444 }
445 commands
446 }
447
448 fn effects_from_retry_batch(&mut self, batch: ProtocolRetryBatch) -> Vec<ProtocolEffect> {
449 let mut effects = batch.effects;
450 for result in batch.direct_results {
451 if !result.event_ids.is_empty() {
452 self.mark_message_sent(&result.chat_id, &result.message_id);
453 }
454 effects.extend(result.effects);
455 }
456 effects.extend(batch.group_result.effects);
457 for message in batch.direct_messages {
458 self.apply_decrypted_protocol_message(message);
459 }
460 effects
461 }
462
463 fn apply_decrypted_protocol_message(&mut self, message: ProtocolDecryptedMessage) {
464 self.apply_decrypted(
465 message.sender,
466 message.conversation_owner,
467 &message.content,
468 message.event_id,
469 );
470 }
471
472 fn apply_decrypted(
473 &mut self,
474 sender: PublicKey,
475 conversation_owner: Option<PublicKey>,
476 content: &str,
477 source_event_id: Option<String>,
478 ) {
479 let Some(rumor) = parse_runtime_rumor(content) else {
480 return;
481 };
482 if rumor.kind != CHAT_MESSAGE_KIND {
483 return;
484 }
485 let local_owner = self.owner_public_key;
486 let peer = if local_owner == Some(sender) {
487 conversation_owner.unwrap_or(sender)
488 } else {
489 sender
490 };
491 let chat_id = peer.to_hex();
492 self.ensure_thread(&chat_id, rumor.created_at_secs);
493 self.insert_message(
494 &chat_id,
495 &rumor.id,
496 &rumor.content,
497 local_owner == Some(sender),
498 rumor.created_at_secs,
499 if local_owner == Some(sender) {
500 DirectMessageDelivery::Sent
501 } else {
502 DirectMessageDelivery::Received
503 },
504 source_event_id.as_deref(),
505 );
506 }
507
508 fn ensure_schema(&self) {
509 if let Ok(conn) = self.conn.lock() {
510 let _ = conn.execute_batch(SCHEMA);
511 }
512 }
513
514 fn ensure_thread(&self, chat_id: &str, updated_at: u64) {
515 if let Ok(conn) = self.conn.lock() {
516 let _ = conn.execute(
517 "INSERT INTO private_chat_threads (chat_id, display_name, avatar_seed, updated_at_secs)
518 VALUES (?1, '', '', ?2)
519 ON CONFLICT(chat_id) DO UPDATE SET updated_at_secs = MAX(updated_at_secs, excluded.updated_at_secs)",
520 params![chat_id, updated_at as i64],
521 );
522 }
523 }
524
525 fn messages(&self, chat_id: &str, limit: usize) -> Vec<DirectMessageSnapshot> {
526 let Ok(conn) = self.conn.lock() else {
527 return Vec::new();
528 };
529 let mut stmt = match conn.prepare(
530 "SELECT id, body, is_outgoing, created_at_secs, delivery
531 FROM private_chat_messages
532 WHERE chat_id = ?1
533 ORDER BY created_at_secs DESC, id DESC
534 LIMIT ?2",
535 ) {
536 Ok(stmt) => stmt,
537 Err(_) => return Vec::new(),
538 };
539 let rows = match stmt.query_map(params![chat_id, limit as i64], |row| {
540 Ok(DirectMessageSnapshot {
541 id: row.get(0)?,
542 chat_id: chat_id.to_string(),
543 body: row.get(1)?,
544 is_outgoing: row.get::<_, i64>(2)? != 0,
545 created_at_secs: row.get::<_, i64>(3)?.max(0) as u64,
546 delivery: DirectMessageDelivery::from_str(&row.get::<_, String>(4)?),
547 })
548 }) {
549 Ok(rows) => rows,
550 Err(_) => return Vec::new(),
551 };
552 let mut messages = rows.filter_map(Result::ok).collect::<Vec<_>>();
553 messages.reverse();
554 messages
555 }
556
557 fn insert_message(
558 &self,
559 chat_id: &str,
560 id: &str,
561 body: &str,
562 is_outgoing: bool,
563 created_at: u64,
564 delivery: DirectMessageDelivery,
565 source_event_id: Option<&str>,
566 ) {
567 if id.is_empty() {
568 return;
569 }
570 if let Ok(conn) = self.conn.lock() {
571 let _ = conn.execute(
572 "INSERT OR IGNORE INTO private_chat_messages
573 (chat_id, id, body, is_outgoing, created_at_secs, delivery, source_event_id)
574 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
575 params![
576 chat_id,
577 id,
578 body,
579 is_outgoing as i64,
580 created_at as i64,
581 delivery.as_str(),
582 source_event_id,
583 ],
584 );
585 let _ = conn.execute(
586 "UPDATE private_chat_threads SET updated_at_secs = MAX(updated_at_secs, ?2)
587 WHERE chat_id = ?1",
588 params![chat_id, created_at as i64],
589 );
590 }
591 }
592
593 fn seen_event(&self, event_id: &str) -> bool {
594 let Ok(conn) = self.conn.lock() else {
595 return true;
596 };
597 conn.query_row(
598 "SELECT 1 FROM private_chat_seen_events WHERE event_id = ?1",
599 [event_id],
600 |_| Ok(()),
601 )
602 .optional()
603 .ok()
604 .flatten()
605 .is_some()
606 }
607
608 fn mark_seen_event(&self, event_id: &str) {
609 if let Ok(conn) = self.conn.lock() {
610 let _ = conn.execute(
611 "INSERT OR IGNORE INTO private_chat_seen_events (event_id) VALUES (?1)",
612 [event_id],
613 );
614 }
615 }
616
617 fn mark_message_sent(&self, chat_id: &str, id: &str) {
618 if id.is_empty() {
619 return;
620 }
621 if let Ok(conn) = self.conn.lock() {
622 let _ = conn.execute(
623 "UPDATE private_chat_messages
624 SET delivery = ?3
625 WHERE chat_id = ?1 AND id = ?2",
626 params![chat_id, id, DirectMessageDelivery::Sent.as_str()],
627 );
628 }
629 }
630}
631
632struct RuntimeRumor {
633 id: String,
634 kind: u32,
635 content: String,
636 created_at_secs: u64,
637}
638
639fn parse_runtime_rumor(content: &str) -> Option<RuntimeRumor> {
640 let mut event = serde_json::from_str::<UnsignedEvent>(content).ok()?;
641 event.ensure_id();
642 event.verify_id().ok()?;
643 Some(RuntimeRumor {
644 id: event.id.as_ref()?.to_string(),
645 kind: event.kind.as_u16() as u32,
646 content: event.content,
647 created_at_secs: event.created_at.as_secs(),
648 })
649}
650
651fn chat_snapshot_for_pubkey(chat_id: &str) -> DirectChatSnapshot {
652 DirectChatSnapshot {
653 chat_id: chat_id.to_string(),
654 last_message_preview: String::new(),
655 last_message_at: 0,
656 unread_count: 0,
657 }
658}
659
660fn normalize_pubkey(input: &str) -> Result<String, String> {
661 PublicKey::parse(input)
662 .map(|pubkey| pubkey.to_hex())
663 .map_err(|error| error.to_string())
664}
665
666fn unix_now() -> u64 {
667 std::time::SystemTime::now()
668 .duration_since(std::time::UNIX_EPOCH)
669 .map(|duration| duration.as_secs())
670 .unwrap_or_default()
671}