Skip to main content

iris_chat_protocol/
direct_messages.rs

1use std::path::Path;
2use std::sync::{Arc, Mutex};
3
4use nostr::{Event, Filter, Keys, PublicKey, UnsignedEvent};
5use rusqlite::{params, Connection, OptionalExtension};
6
7use crate::{
8    is_app_keys_event, AppKeys, ProtocolDecryptedMessage, ProtocolEffect, ProtocolEngine,
9    ProtocolRetryBatch, SharedConnection, SqliteStorageAdapter, UnixSeconds, APP_KEYS_EVENT_KIND,
10    CHAT_MESSAGE_KIND, INVITE_EVENT_KIND, INVITE_RESPONSE_KIND, MESSAGE_EVENT_KIND,
11};
12
13const SCHEMA: &str = r#"
14CREATE TABLE IF NOT EXISTS private_chat_threads (
15    chat_id TEXT PRIMARY KEY,
16    display_name TEXT NOT NULL,
17    avatar_seed TEXT NOT NULL,
18    updated_at_secs INTEGER NOT NULL DEFAULT 0
19);
20
21CREATE TABLE IF NOT EXISTS private_chat_messages (
22    chat_id TEXT NOT NULL,
23    id TEXT NOT NULL,
24    body TEXT NOT NULL,
25    is_outgoing INTEGER NOT NULL,
26    created_at_secs INTEGER NOT NULL,
27    delivery TEXT NOT NULL,
28    source_event_id TEXT,
29    PRIMARY KEY (chat_id, id)
30);
31
32CREATE INDEX IF NOT EXISTS private_chat_recent_idx
33    ON private_chat_messages(chat_id, created_at_secs, id);
34
35CREATE UNIQUE INDEX IF NOT EXISTS private_chat_source_event_idx
36    ON private_chat_messages(source_event_id)
37    WHERE source_event_id IS NOT NULL;
38
39CREATE TABLE IF NOT EXISTS private_chat_seen_events (
40    event_id TEXT PRIMARY KEY
41);
42
43CREATE TABLE IF NOT EXISTS ndr_kv (
44    owner_pubkey_hex TEXT NOT NULL,
45    device_pubkey_hex TEXT NOT NULL,
46    key TEXT NOT NULL,
47    value TEXT NOT NULL,
48    PRIMARY KEY (owner_pubkey_hex, device_pubkey_hex, key)
49);
50"#;
51
52#[derive(Clone, Debug, PartialEq, Eq)]
53pub enum DirectMessageDelivery {
54    Pending,
55    Sent,
56    Received,
57    Failed,
58}
59
60impl DirectMessageDelivery {
61    fn as_str(&self) -> &'static str {
62        match self {
63            Self::Pending => "pending",
64            Self::Sent => "sent",
65            Self::Received => "received",
66            Self::Failed => "failed",
67        }
68    }
69
70    fn from_str(value: &str) -> Self {
71        match value {
72            "sent" => Self::Sent,
73            "received" => Self::Received,
74            "failed" => Self::Failed,
75            _ => Self::Pending,
76        }
77    }
78}
79
80#[derive(Clone, Debug, PartialEq, Eq)]
81pub struct DirectMessageSnapshot {
82    pub id: String,
83    pub chat_id: String,
84    pub body: String,
85    pub is_outgoing: bool,
86    pub created_at_secs: u64,
87    pub delivery: DirectMessageDelivery,
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub struct DirectChatSnapshot {
92    pub chat_id: String,
93    pub last_message_preview: String,
94    pub last_message_at: u64,
95    pub unread_count: u32,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq)]
99pub struct DirectThreadSnapshot {
100    pub chat: DirectChatSnapshot,
101    pub messages: Vec<DirectMessageSnapshot>,
102}
103
104#[derive(Clone, Debug)]
105pub enum DirectMessageCommand {
106    Publish(Event),
107    Subscribe {
108        subscription_id: String,
109        filters: Vec<Filter>,
110        durable: bool,
111    },
112}
113
114pub struct DirectMessageService {
115    conn: SharedConnection,
116    protocol_engine: Option<ProtocolEngine>,
117    owner_public_key: Option<PublicKey>,
118    relay_subscription_key: Option<String>,
119    fetch_subscription_counter: u64,
120    last_error: Option<String>,
121}
122
123impl DirectMessageService {
124    pub fn memory() -> Self {
125        let service = Self {
126            conn: Arc::new(Mutex::new(Connection::open_in_memory().unwrap())),
127            protocol_engine: None,
128            owner_public_key: None,
129            relay_subscription_key: None,
130            fetch_subscription_counter: 0,
131            last_error: None,
132        };
133        service.ensure_schema();
134        service
135    }
136
137    pub fn open(data_dir: &Path, owner_keys: Option<&Keys>) -> Self {
138        let path = data_dir.join("private-chat.sqlite3");
139        let conn = Connection::open(path).or_else(|_| Connection::open_in_memory());
140        let conn = match conn {
141            Ok(conn) => conn,
142            Err(error) => {
143                return Self {
144                    conn: Arc::new(Mutex::new(Connection::open_in_memory().unwrap())),
145                    protocol_engine: None,
146                    owner_public_key: None,
147                    relay_subscription_key: None,
148                    fetch_subscription_counter: 0,
149                    last_error: Some(format!("Direct message store open failed: {error}")),
150                };
151            }
152        };
153        let service = Self {
154            conn: Arc::new(Mutex::new(conn)),
155            protocol_engine: None,
156            owner_public_key: None,
157            relay_subscription_key: None,
158            fetch_subscription_counter: 0,
159            last_error: None,
160        };
161        service.ensure_schema();
162        if let Some(keys) = owner_keys {
163            service.with_protocol_engine(keys)
164        } else {
165            service
166        }
167    }
168
169    pub fn activate(&mut self, keys: &Keys) -> Vec<DirectMessageCommand> {
170        let next = Self {
171            conn: Arc::clone(&self.conn),
172            protocol_engine: None,
173            owner_public_key: None,
174            relay_subscription_key: self.relay_subscription_key.clone(),
175            fetch_subscription_counter: self.fetch_subscription_counter,
176            last_error: self.last_error.clone(),
177        }
178        .with_protocol_engine(keys);
179        self.protocol_engine = next.protocol_engine;
180        self.owner_public_key = next.owner_public_key;
181        self.protocol_subscription_commands()
182    }
183
184    pub fn last_error(&self) -> Option<String> {
185        self.last_error.clone()
186    }
187
188    pub fn chats(&self) -> Vec<DirectChatSnapshot> {
189        let Ok(conn) = self.conn.lock() else {
190            return Vec::new();
191        };
192        let mut stmt = match conn.prepare(
193            "SELECT t.chat_id,
194                    COALESCE(m.body, ''), COALESCE(m.created_at_secs, t.updated_at_secs)
195             FROM private_chat_threads t
196             LEFT JOIN private_chat_messages m
197               ON m.chat_id = t.chat_id
198              AND m.created_at_secs = (
199                    SELECT MAX(created_at_secs)
200                    FROM private_chat_messages
201                    WHERE chat_id = t.chat_id
202              )
203             ORDER BY COALESCE(m.created_at_secs, t.updated_at_secs) DESC, t.chat_id ASC",
204        ) {
205            Ok(stmt) => stmt,
206            Err(_) => return Vec::new(),
207        };
208        let rows = match stmt.query_map([], |row| {
209            Ok(DirectChatSnapshot {
210                chat_id: row.get(0)?,
211                last_message_preview: row.get(1)?,
212                last_message_at: row.get::<_, i64>(2)?.max(0) as u64,
213                unread_count: 0,
214            })
215        }) {
216            Ok(rows) => rows,
217            Err(_) => return Vec::new(),
218        };
219        rows.filter_map(Result::ok).collect()
220    }
221
222    pub fn thread(&self, chat_id: &str) -> Option<DirectThreadSnapshot> {
223        let chat_id = normalize_pubkey(chat_id).ok()?;
224        let chat = self
225            .chats()
226            .into_iter()
227            .find(|chat| chat.chat_id == chat_id)
228            .unwrap_or_else(|| chat_snapshot_for_pubkey(&chat_id));
229        let messages = self.messages(&chat_id, 160);
230        Some(DirectThreadSnapshot { chat, messages })
231    }
232
233    pub fn open_chat(
234        &mut self,
235        peer_input: &str,
236        _keys: &Keys,
237    ) -> Result<(DirectThreadSnapshot, Vec<DirectMessageCommand>), String> {
238        let public_key = PublicKey::parse(peer_input).map_err(|error| error.to_string())?;
239        let chat_id = public_key.to_hex();
240        self.ensure_thread(&chat_id, unix_now());
241        let commands = self.protocol_subscription_commands();
242        let thread = self
243            .thread(&chat_id)
244            .ok_or_else(|| "Chat open failed".to_string())?;
245        Ok((thread, commands))
246    }
247
248    pub fn send_message(
249        &mut self,
250        chat_id: &str,
251        body: &str,
252        _keys: &Keys,
253    ) -> Result<Vec<DirectMessageCommand>, String> {
254        let body = body.trim();
255        if body.is_empty() {
256            return Ok(Vec::new());
257        }
258        let public_key = PublicKey::parse(chat_id).map_err(|error| error.to_string())?;
259        let chat_id = public_key.to_hex();
260        self.ensure_thread(&chat_id, unix_now());
261        let engine = self
262            .protocol_engine
263            .as_mut()
264            .ok_or_else(|| "Direct message runtime is not ready".to_string())?;
265        let result = engine
266            .send_direct_text(public_key, &chat_id, body, None, UnixSeconds(unix_now()))
267            .map_err(|error| error.to_string())?;
268        let delivery = if result.event_ids.is_empty() {
269            DirectMessageDelivery::Pending
270        } else {
271            DirectMessageDelivery::Sent
272        };
273        self.insert_message(
274            &chat_id,
275            &result.message_id,
276            body,
277            true,
278            unix_now(),
279            delivery,
280            None,
281        );
282        Ok(self.commands_from_effects(result.effects))
283    }
284
285    pub fn process_event(&mut self, event: Event, _keys: &Keys) -> Vec<DirectMessageCommand> {
286        let event_id = event.id.to_hex();
287        if self.seen_event(&event_id) {
288            return Vec::new();
289        }
290        let Some(engine) = self.protocol_engine.as_mut() else {
291            return Vec::new();
292        };
293        let kind = event.kind.as_u16() as u32;
294        let mut effects = Vec::new();
295        let mut retry_batch = ProtocolRetryBatch::default();
296        let mut decrypted = None;
297
298        let processed = match kind {
299            APP_KEYS_EVENT_KIND if is_app_keys_event(&event) => match AppKeys::from_event(&event) {
300                Ok(app_keys) => match engine.ingest_app_keys_snapshot(
301                    event.pubkey,
302                    app_keys,
303                    event.created_at.as_secs(),
304                ) {
305                    Ok(batch) => {
306                        retry_batch = batch;
307                        true
308                    }
309                    Err(error) => {
310                        self.last_error = Some(format!("Direct message app keys failed: {error}"));
311                        false
312                    }
313                },
314                Err(_) => false,
315            },
316            INVITE_EVENT_KIND => match engine.observe_invite_event(&event) {
317                Ok(batch) => {
318                    retry_batch = batch;
319                    true
320                }
321                Err(_) => false,
322            },
323            INVITE_RESPONSE_KIND => match engine.observe_invite_response_event(&event) {
324                Ok(batch) => {
325                    retry_batch = batch;
326                    true
327                }
328                Err(_) => false,
329            },
330            MESSAGE_EVENT_KIND => match engine.process_direct_message_event(&event) {
331                Ok(message) => {
332                    decrypted = message;
333                    true
334                }
335                Err(_) => false,
336            },
337            _ => false,
338        };
339
340        if !processed {
341            return Vec::new();
342        }
343        self.mark_seen_event(&event_id);
344        if let Some(message) = decrypted {
345            self.apply_decrypted_protocol_message(message);
346        }
347        effects.extend(self.effects_from_retry_batch(retry_batch));
348        self.commands_from_effects(effects)
349    }
350
351    pub fn mobile_push_message_author_pubkeys(&self) -> Vec<String> {
352        let Some(engine) = self.protocol_engine.as_ref() else {
353            return Vec::new();
354        };
355        let mut authors = engine
356            .known_message_author_pubkeys()
357            .into_iter()
358            .map(|pubkey| pubkey.to_hex())
359            .collect::<Vec<_>>();
360        authors.sort();
361        authors.dedup();
362        authors
363    }
364
365    fn subscription_command(&mut self) -> Option<DirectMessageCommand> {
366        let engine = self.protocol_engine.as_ref()?;
367        let authors = engine
368            .known_message_author_pubkeys()
369            .into_iter()
370            .chain(self.owner_public_key)
371            .collect::<Vec<_>>();
372        let mut author_hexes = authors.iter().map(PublicKey::to_hex).collect::<Vec<_>>();
373        author_hexes.sort();
374        author_hexes.dedup();
375        let key = author_hexes.join(",");
376        if key.is_empty() || self.relay_subscription_key.as_deref() == Some(key.as_str()) {
377            return None;
378        }
379        self.relay_subscription_key = Some(key);
380
381        let public_keys = author_hexes
382            .iter()
383            .filter_map(|hex| PublicKey::parse(hex).ok())
384            .collect::<Vec<_>>();
385        let filter = Filter::new()
386            .authors(public_keys)
387            .kinds([
388                nostr::Kind::from(MESSAGE_EVENT_KIND as u16),
389                nostr::Kind::from(INVITE_EVENT_KIND as u16),
390                nostr::Kind::from(INVITE_RESPONSE_KIND as u16),
391                nostr::Kind::from(APP_KEYS_EVENT_KIND as u16),
392            ])
393            .limit(500);
394        Some(DirectMessageCommand::Subscribe {
395            subscription_id: "iris-native-private-chat".to_string(),
396            filters: vec![filter],
397            durable: true,
398        })
399    }
400
401    fn with_protocol_engine(mut self, keys: &Keys) -> Self {
402        let owner = keys.public_key();
403        let owner_hex = owner.to_hex();
404        let storage = Arc::new(SqliteStorageAdapter::new(
405            Arc::clone(&self.conn),
406            owner_hex.clone(),
407            owner_hex,
408        ));
409        match ProtocolEngine::load_or_create_for_local_device(storage, owner, keys) {
410            Ok(engine) => {
411                self.protocol_engine = Some(engine);
412                self.owner_public_key = Some(owner);
413            }
414            Err(error) => self.last_error = Some(format!("Direct message init failed: {error}")),
415        }
416        self
417    }
418
419    fn protocol_subscription_commands(&mut self) -> Vec<DirectMessageCommand> {
420        self.subscription_command().into_iter().collect()
421    }
422
423    fn commands_from_effects(&mut self, effects: Vec<ProtocolEffect>) -> Vec<DirectMessageCommand> {
424        let mut commands = Vec::new();
425        for effect in effects {
426            match effect {
427                ProtocolEffect::Publish(publish) => {
428                    commands.push(DirectMessageCommand::Publish(publish.event));
429                }
430                ProtocolEffect::FetchProtocolState { filters, reason } => {
431                    self.fetch_subscription_counter =
432                        self.fetch_subscription_counter.saturating_add(1);
433                    let subscription_id = format!(
434                        "iris-native-private-chat-fetch-{reason}-{}",
435                        self.fetch_subscription_counter
436                    );
437                    commands.push(DirectMessageCommand::Subscribe {
438                        subscription_id,
439                        filters,
440                        durable: false,
441                    });
442                }
443            }
444        }
445        commands
446    }
447
448    fn effects_from_retry_batch(&mut self, batch: ProtocolRetryBatch) -> Vec<ProtocolEffect> {
449        let mut effects = batch.effects;
450        for result in batch.direct_results {
451            if !result.event_ids.is_empty() {
452                self.mark_message_sent(&result.chat_id, &result.message_id);
453            }
454            effects.extend(result.effects);
455        }
456        effects.extend(batch.group_result.effects);
457        for message in batch.direct_messages {
458            self.apply_decrypted_protocol_message(message);
459        }
460        effects
461    }
462
463    fn apply_decrypted_protocol_message(&mut self, message: ProtocolDecryptedMessage) {
464        self.apply_decrypted(
465            message.sender,
466            message.conversation_owner,
467            &message.content,
468            message.event_id,
469        );
470    }
471
472    fn apply_decrypted(
473        &mut self,
474        sender: PublicKey,
475        conversation_owner: Option<PublicKey>,
476        content: &str,
477        source_event_id: Option<String>,
478    ) {
479        let Some(rumor) = parse_runtime_rumor(content) else {
480            return;
481        };
482        if rumor.kind != CHAT_MESSAGE_KIND {
483            return;
484        }
485        let local_owner = self.owner_public_key;
486        let peer = if local_owner == Some(sender) {
487            conversation_owner.unwrap_or(sender)
488        } else {
489            sender
490        };
491        let chat_id = peer.to_hex();
492        self.ensure_thread(&chat_id, rumor.created_at_secs);
493        self.insert_message(
494            &chat_id,
495            &rumor.id,
496            &rumor.content,
497            local_owner == Some(sender),
498            rumor.created_at_secs,
499            if local_owner == Some(sender) {
500                DirectMessageDelivery::Sent
501            } else {
502                DirectMessageDelivery::Received
503            },
504            source_event_id.as_deref(),
505        );
506    }
507
508    fn ensure_schema(&self) {
509        if let Ok(conn) = self.conn.lock() {
510            let _ = conn.execute_batch(SCHEMA);
511        }
512    }
513
514    fn ensure_thread(&self, chat_id: &str, updated_at: u64) {
515        if let Ok(conn) = self.conn.lock() {
516            let _ = conn.execute(
517                "INSERT INTO private_chat_threads (chat_id, display_name, avatar_seed, updated_at_secs)
518                 VALUES (?1, '', '', ?2)
519                 ON CONFLICT(chat_id) DO UPDATE SET updated_at_secs = MAX(updated_at_secs, excluded.updated_at_secs)",
520                params![chat_id, updated_at as i64],
521            );
522        }
523    }
524
525    fn messages(&self, chat_id: &str, limit: usize) -> Vec<DirectMessageSnapshot> {
526        let Ok(conn) = self.conn.lock() else {
527            return Vec::new();
528        };
529        let mut stmt = match conn.prepare(
530            "SELECT id, body, is_outgoing, created_at_secs, delivery
531             FROM private_chat_messages
532             WHERE chat_id = ?1
533             ORDER BY created_at_secs DESC, id DESC
534             LIMIT ?2",
535        ) {
536            Ok(stmt) => stmt,
537            Err(_) => return Vec::new(),
538        };
539        let rows = match stmt.query_map(params![chat_id, limit as i64], |row| {
540            Ok(DirectMessageSnapshot {
541                id: row.get(0)?,
542                chat_id: chat_id.to_string(),
543                body: row.get(1)?,
544                is_outgoing: row.get::<_, i64>(2)? != 0,
545                created_at_secs: row.get::<_, i64>(3)?.max(0) as u64,
546                delivery: DirectMessageDelivery::from_str(&row.get::<_, String>(4)?),
547            })
548        }) {
549            Ok(rows) => rows,
550            Err(_) => return Vec::new(),
551        };
552        let mut messages = rows.filter_map(Result::ok).collect::<Vec<_>>();
553        messages.reverse();
554        messages
555    }
556
557    fn insert_message(
558        &self,
559        chat_id: &str,
560        id: &str,
561        body: &str,
562        is_outgoing: bool,
563        created_at: u64,
564        delivery: DirectMessageDelivery,
565        source_event_id: Option<&str>,
566    ) {
567        if id.is_empty() {
568            return;
569        }
570        if let Ok(conn) = self.conn.lock() {
571            let _ = conn.execute(
572                "INSERT OR IGNORE INTO private_chat_messages
573                 (chat_id, id, body, is_outgoing, created_at_secs, delivery, source_event_id)
574                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
575                params![
576                    chat_id,
577                    id,
578                    body,
579                    is_outgoing as i64,
580                    created_at as i64,
581                    delivery.as_str(),
582                    source_event_id,
583                ],
584            );
585            let _ = conn.execute(
586                "UPDATE private_chat_threads SET updated_at_secs = MAX(updated_at_secs, ?2)
587                 WHERE chat_id = ?1",
588                params![chat_id, created_at as i64],
589            );
590        }
591    }
592
593    fn seen_event(&self, event_id: &str) -> bool {
594        let Ok(conn) = self.conn.lock() else {
595            return true;
596        };
597        conn.query_row(
598            "SELECT 1 FROM private_chat_seen_events WHERE event_id = ?1",
599            [event_id],
600            |_| Ok(()),
601        )
602        .optional()
603        .ok()
604        .flatten()
605        .is_some()
606    }
607
608    fn mark_seen_event(&self, event_id: &str) {
609        if let Ok(conn) = self.conn.lock() {
610            let _ = conn.execute(
611                "INSERT OR IGNORE INTO private_chat_seen_events (event_id) VALUES (?1)",
612                [event_id],
613            );
614        }
615    }
616
617    fn mark_message_sent(&self, chat_id: &str, id: &str) {
618        if id.is_empty() {
619            return;
620        }
621        if let Ok(conn) = self.conn.lock() {
622            let _ = conn.execute(
623                "UPDATE private_chat_messages
624                 SET delivery = ?3
625                 WHERE chat_id = ?1 AND id = ?2",
626                params![chat_id, id, DirectMessageDelivery::Sent.as_str()],
627            );
628        }
629    }
630}
631
632struct RuntimeRumor {
633    id: String,
634    kind: u32,
635    content: String,
636    created_at_secs: u64,
637}
638
639fn parse_runtime_rumor(content: &str) -> Option<RuntimeRumor> {
640    let mut event = serde_json::from_str::<UnsignedEvent>(content).ok()?;
641    event.ensure_id();
642    event.verify_id().ok()?;
643    Some(RuntimeRumor {
644        id: event.id.as_ref()?.to_string(),
645        kind: event.kind.as_u16() as u32,
646        content: event.content,
647        created_at_secs: event.created_at.as_secs(),
648    })
649}
650
651fn chat_snapshot_for_pubkey(chat_id: &str) -> DirectChatSnapshot {
652    DirectChatSnapshot {
653        chat_id: chat_id.to_string(),
654        last_message_preview: String::new(),
655        last_message_at: 0,
656        unread_count: 0,
657    }
658}
659
660fn normalize_pubkey(input: &str) -> Result<String, String> {
661    PublicKey::parse(input)
662        .map(|pubkey| pubkey.to_hex())
663        .map_err(|error| error.to_string())
664}
665
666fn unix_now() -> u64 {
667    std::time::SystemTime::now()
668        .duration_since(std::time::UNIX_EPOCH)
669        .map(|duration| duration.as_secs())
670        .unwrap_or_default()
671}