saorsa_core/messaging/
database.rs

1// Message database implementation using deadpool-sqlite + rusqlite
2// Replaced sqlx to resolve RSA security vulnerability RUSTSEC-2023-0071
3
4use crate::messaging::types::*;
5use crate::messaging::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, TimeZone, Utc};
8use deadpool_sqlite::{Config, Pool, Runtime};
9use rusqlite::params;
10use serde_json;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use tracing::{debug, info};
14use uuid::Uuid;
15
16pub use crate::dht::client::DhtClient;
17
18/// Message store using deadpool-sqlite for connection pooling
19#[derive(Clone)]
20pub struct DatabaseMessageStore {
21    pool: Pool,
22    #[allow(dead_code)] // TODO: Integrate DHT client functionality
23    dht_client: DhtClient,
24}
25
26impl DatabaseMessageStore {
27    /// Create a new database message store
28    pub async fn new(dht_client: DhtClient, db_path: Option<PathBuf>) -> Result<Self> {
29        let db_path = db_path.unwrap_or_else(|| {
30            let mut path = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
31            path.push("saorsa");
32            std::fs::create_dir_all(&path).unwrap_or(());
33            path.push("messages.db");
34            path
35        });
36
37        info!("Initializing message database at: {:?}", db_path);
38
39        // Create deadpool-sqlite configuration
40        let cfg = Config::new(db_path);
41        let pool = cfg.create_pool(Runtime::Tokio1)?;
42
43        let store = Self { pool, dht_client };
44
45        // Initialize database schema
46        store.init_schema().await?;
47
48        Ok(store)
49    }
50
51    /// Initialize the database schema with all required tables
52    async fn init_schema(&self) -> Result<()> {
53        let conn = self.pool.get().await?;
54
55        let result = conn.interact(|conn| -> Result<(), rusqlite::Error> {
56            // Configure SQLite for optimal performance
57            conn.execute("PRAGMA journal_mode = WAL", [])?;
58            conn.execute("PRAGMA synchronous = NORMAL", [])?;
59            conn.execute("PRAGMA cache_size = -64000", [])?; // 64MB cache
60            conn.execute("PRAGMA foreign_keys = ON", [])?;
61            conn.execute("PRAGMA temp_store = MEMORY", [])?;
62            conn.execute("PRAGMA mmap_size = 268435456", [])?; // 256MB mmap
63
64            // Messages table
65            conn.execute(
66                "CREATE TABLE IF NOT EXISTS messages (
67                    id TEXT PRIMARY KEY NOT NULL,
68                    channel_id TEXT NOT NULL,
69                    sender TEXT NOT NULL,
70                    content TEXT NOT NULL,
71                    thread_id TEXT,
72                    reply_to TEXT,
73                    created_at INTEGER NOT NULL,
74                    edited_at INTEGER,
75                    deleted_at INTEGER,
76                    ephemeral INTEGER DEFAULT 0,
77                    signature TEXT NOT NULL DEFAULT ''
78                )",
79                [],
80            )?;
81
82            // Attachments table
83            conn.execute(
84                "CREATE TABLE IF NOT EXISTS attachments (
85                    id TEXT PRIMARY KEY NOT NULL,
86                    message_id TEXT NOT NULL,
87                    filename TEXT NOT NULL,
88                    mime_type TEXT NOT NULL,
89                    size_bytes INTEGER NOT NULL,
90                    dht_hash TEXT NOT NULL,
91                    thumbnail BLOB,
92                    metadata TEXT DEFAULT '{}',
93                    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
94                )",
95                [],
96            )?;
97
98            // Reactions table
99            conn.execute(
100                "CREATE TABLE IF NOT EXISTS reactions (
101                    id INTEGER PRIMARY KEY AUTOINCREMENT,
102                    message_id TEXT NOT NULL,
103                    emoji TEXT NOT NULL,
104                    user_id TEXT NOT NULL,
105                    created_at INTEGER NOT NULL,
106                    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
107                    UNIQUE(message_id, emoji, user_id)
108                )",
109                [],
110            )?;
111
112            // Mentions table
113            conn.execute(
114                "CREATE TABLE IF NOT EXISTS mentions (
115                    id INTEGER PRIMARY KEY AUTOINCREMENT,
116                    message_id TEXT NOT NULL,
117                    user TEXT NOT NULL,
118                    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
119                )",
120                [],
121            )?;
122
123            // Read receipts table
124            conn.execute(
125                "CREATE TABLE IF NOT EXISTS read_receipts (
126                    message_id TEXT NOT NULL,
127                    user_id TEXT NOT NULL,
128                    read_at INTEGER NOT NULL,
129                    PRIMARY KEY (message_id, user_id),
130                    FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
131                )",
132                [],
133            )?;
134
135            // Create indexes for performance
136            conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id, created_at DESC)", [])?;
137            conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id, created_at)", [])?;
138            conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages(sender)", [])?;
139            conn.execute("CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id)", [])?;
140            conn.execute("CREATE INDEX IF NOT EXISTS idx_reactions_message ON reactions(message_id)", [])?;
141            conn.execute("CREATE INDEX IF NOT EXISTS idx_mentions_user ON mentions(user)", [])?;
142
143            Ok(())
144        }).await;
145
146        match result {
147            Ok(Ok(())) => {
148                info!("Database schema initialized successfully");
149                Ok(())
150            }
151            Ok(Err(e)) => Err(anyhow::anyhow!("Database schema creation failed: {}", e)),
152            Err(e) => Err(anyhow::anyhow!(
153                "Failed to execute schema initialization: {}",
154                e
155            )),
156        }
157    }
158
159    /// Store a message in the database
160    pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
161        let conn = self.pool.get().await?;
162
163        // Serialize content
164        let content_json = serde_json::to_string(&message.content)?;
165        let message_clone = message.clone();
166
167        let result = conn
168            .interact(move |conn| -> Result<(), rusqlite::Error> {
169                let tx = conn.transaction()?;
170
171                // Insert main message
172                tx.execute(
173                    "INSERT OR REPLACE INTO messages (
174                    id, channel_id, sender, content, thread_id, reply_to,
175                    created_at, edited_at, deleted_at, ephemeral, signature
176                ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
177                    params![
178                        message_clone.id.to_string(),
179                        message_clone.channel_id.to_string(),
180                        message_clone.sender.to_string(),
181                        content_json,
182                        message_clone.thread_id.as_ref().map(|id| id.to_string()),
183                        message_clone.reply_to.as_ref().map(|id| id.to_string()),
184                        message_clone.created_at.timestamp_millis(),
185                        message_clone
186                            .edited_at
187                            .as_ref()
188                            .map(|dt| dt.timestamp_millis()),
189                        message_clone
190                            .deleted_at
191                            .as_ref()
192                            .map(|dt| dt.timestamp_millis()),
193                        message_clone.ephemeral as i32,
194                        hex::encode(&message_clone.signature.signature)
195                    ],
196                )?;
197
198                // Insert attachments
199                for attachment in &message_clone.attachments {
200                    let metadata_json =
201                        serde_json::to_string(&attachment.metadata).unwrap_or_default();
202
203                    tx.execute(
204                        "INSERT OR REPLACE INTO attachments (
205                        id, message_id, filename, mime_type, size_bytes,
206                        dht_hash, thumbnail, metadata
207                    ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
208                        params![
209                            attachment.id,
210                            message_clone.id.to_string(),
211                            attachment.filename,
212                            attachment.mime_type,
213                            attachment.size_bytes,
214                            attachment.dht_hash,
215                            attachment.thumbnail.as_ref(),
216                            metadata_json
217                        ],
218                    )?;
219                }
220
221                // Insert mentions
222                for mention in &message_clone.mentions {
223                    tx.execute(
224                        "INSERT OR IGNORE INTO mentions (message_id, user) VALUES (?1, ?2)",
225                        params![message_clone.id.to_string(), mention.to_string()],
226                    )?;
227                }
228
229                // Insert reactions
230                for (emoji, users) in &message_clone.reactions {
231                    for user in users {
232                        tx.execute(
233                        "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at) 
234                         VALUES (?1, ?2, ?3, ?4)",
235                        params![
236                            message_clone.id.to_string(),
237                            emoji,
238                            user.to_string(),
239                            chrono::Utc::now().timestamp_millis()
240                        ],
241                    )?;
242                    }
243                }
244
245                tx.commit()?;
246                Ok(())
247            })
248            .await;
249
250        match result {
251            Ok(Ok(())) => {
252                debug!("Message stored successfully: {}", message.id);
253                Ok(())
254            }
255            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to store message: {}", e)),
256            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
257        }
258    }
259
260    /// Retrieve a message by ID
261    pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
262        let conn = self.pool.get().await?;
263        let message_id = id.to_string();
264
265        let result = conn
266            .interact(move |conn| -> Result<RichMessage, rusqlite::Error> {
267                let mut stmt = conn.prepare(
268                    "SELECT id, channel_id, sender, content, thread_id, reply_to,
269                        created_at, edited_at, deleted_at, ephemeral, signature
270                 FROM messages WHERE id = ?1",
271                )?;
272
273                let row = stmt.query_row(params![message_id], |row| {
274                    let content_json: String = row.get("content")?;
275                    let content: MessageContent =
276                        serde_json::from_str(&content_json).map_err(|_| {
277                            rusqlite::Error::InvalidColumnType(
278                                0,
279                                "content".to_string(),
280                                rusqlite::types::Type::Text,
281                            )
282                        })?;
283
284                    let created_at = Utc
285                        .timestamp_millis_opt(row.get("created_at")?)
286                        .single()
287                        .ok_or(rusqlite::Error::InvalidColumnType(
288                            0,
289                            "created_at".to_string(),
290                            rusqlite::types::Type::Integer,
291                        ))?;
292
293                    let edited_at: Option<i64> = row.get("edited_at")?;
294                    let edited_at = edited_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
295
296                    let deleted_at: Option<i64> = row.get("deleted_at")?;
297                    let deleted_at =
298                        deleted_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
299
300                    let thread_id: Option<String> = row.get("thread_id")?;
301                    let thread_id = thread_id.and_then(|s| Uuid::parse_str(&s).ok().map(ThreadId));
302
303                    let reply_to: Option<String> = row.get("reply_to")?;
304                    let reply_to = reply_to.and_then(|s| Uuid::parse_str(&s).ok().map(MessageId));
305
306                    let signature_hex: String = row.get("signature")?;
307                    let signature_bytes = hex::decode(&signature_hex).unwrap_or_default();
308
309                    Ok(RichMessage {
310                        id: MessageId(Uuid::parse_str(&row.get::<_, String>("id")?).map_err(
311                            |_| {
312                                rusqlite::Error::InvalidColumnType(
313                                    0,
314                                    "id".to_string(),
315                                    rusqlite::types::Type::Text,
316                                )
317                            },
318                        )?),
319                        channel_id: ChannelId(
320                            Uuid::parse_str(&row.get::<_, String>("channel_id")?).map_err(
321                                |_| {
322                                    rusqlite::Error::InvalidColumnType(
323                                        0,
324                                        "channel_id".to_string(),
325                                        rusqlite::types::Type::Text,
326                                    )
327                                },
328                            )?,
329                        ),
330                        sender: UserHandle::from(row.get::<_, String>("sender")?),
331                        content,
332                        thread_id,
333                        reply_to,
334                        created_at,
335                        edited_at,
336                        deleted_at,
337                        ephemeral: row.get::<_, i32>("ephemeral")? != 0,
338                        attachments: Vec::new(),   // Filled below
339                        mentions: Vec::new(),      // Filled below
340                        reactions: HashMap::new(), // Filled below
341                        read_by: HashMap::new(),
342                        delivered_to: HashMap::new(),
343                        expires_at: None,
344                        thread_count: 0,
345                        last_thread_reply: None,
346                        sender_device: crate::messaging::types::DeviceId("primary".to_string()),
347                        encryption: EncryptionMethod::E2E,
348                        signature: MessageSignature {
349                            algorithm: "ed25519".to_string(),
350                            signature: signature_bytes,
351                        },
352                    })
353                })?;
354
355                Ok(row)
356            })
357            .await;
358
359        match result {
360            Ok(Ok(mut message)) => {
361                // Load attachments, mentions, and reactions
362                message.attachments = self.get_attachments(message.id).await?;
363                message.mentions = self.get_mentions(message.id).await?;
364                message.reactions = self.get_reactions(message.id).await?;
365                Ok(message)
366            }
367            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to retrieve message: {}", e)),
368            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
369        }
370    }
371
372    /// Get attachments for a message
373    async fn get_attachments(&self, message_id: MessageId) -> Result<Vec<Attachment>> {
374        let conn = self.pool.get().await?;
375        let msg_id = message_id.to_string();
376
377        let result = conn
378            .interact(move |conn| -> Result<Vec<Attachment>, rusqlite::Error> {
379                let mut stmt = conn.prepare(
380                    "SELECT id, filename, mime_type, size_bytes, dht_hash, thumbnail, metadata
381                 FROM attachments WHERE message_id = ?1",
382                )?;
383
384                let rows = stmt.query_map(params![msg_id], |row| {
385                    let metadata_json: String = row.get("metadata")?;
386                    let metadata: HashMap<String, String> =
387                        serde_json::from_str(&metadata_json).unwrap_or_default();
388
389                    let thumbnail: Option<Vec<u8>> = row.get("thumbnail")?;
390
391                    Ok(Attachment {
392                        id: row.get("id")?,
393                        filename: row.get("filename")?,
394                        mime_type: row.get("mime_type")?,
395                        size_bytes: row.get("size_bytes")?,
396                        dht_hash: row.get("dht_hash")?,
397                        thumbnail,
398                        metadata,
399                        encryption_key: None, // Not stored in DB for security
400                    })
401                })?;
402
403                let mut attachments = Vec::new();
404                for row in rows {
405                    attachments.push(row?);
406                }
407                Ok(attachments)
408            })
409            .await;
410
411        match result {
412            Ok(Ok(attachments)) => Ok(attachments),
413            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get attachments: {}", e)),
414            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
415        }
416    }
417
418    /// Get mentions for a message
419    async fn get_mentions(&self, message_id: MessageId) -> Result<Vec<crate::messaging::user_handle::UserHandle>> {
420        let conn = self.pool.get().await?;
421        let msg_id = message_id.to_string();
422
423        let result = conn
424            .interact(move |conn| -> Result<Vec<String>, rusqlite::Error> {
425                let mut stmt = conn.prepare("SELECT user FROM mentions WHERE message_id = ?1")?;
426                let rows = stmt.query_map(params![msg_id], |row| row.get::<_, String>("user"))?;
427
428                let mut mentions = Vec::new();
429                for row in rows {
430                    mentions.push(row?);
431                }
432                Ok(mentions)
433            })
434            .await;
435
436        match result {
437            Ok(Ok(user_strings)) => Ok(
438                user_strings
439                    .into_iter()
440                    .map(crate::messaging::user_handle::UserHandle::from)
441                    .collect(),
442            ),
443            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get mentions: {}", e)),
444            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
445        }
446    }
447
448    /// Get reactions for a message
449    async fn get_reactions(
450        &self,
451        message_id: MessageId,
452    ) -> Result<HashMap<String, Vec<crate::messaging::user_handle::UserHandle>>> {
453        let conn = self.pool.get().await?;
454        let msg_id = message_id.to_string();
455
456        let result = conn
457            .interact(
458                move |conn| -> Result<HashMap<String, Vec<String>>, rusqlite::Error> {
459                    let mut stmt = conn.prepare(
460                "SELECT emoji, user_id FROM reactions WHERE message_id = ?1 ORDER BY created_at"
461            )?;
462                    let rows = stmt.query_map(params![msg_id], |row| {
463                        Ok((
464                            row.get::<_, String>("emoji")?,
465                            row.get::<_, String>("user_id")?,
466                        ))
467                    })?;
468
469                    let mut reactions: HashMap<String, Vec<String>> = HashMap::new();
470                    for row in rows {
471                        let (emoji, user_id) = row?;
472                        reactions.entry(emoji).or_default().push(user_id);
473                    }
474                    Ok(reactions)
475                },
476            )
477            .await;
478
479        match result {
480            Ok(Ok(reaction_strings)) => Ok(
481                reaction_strings
482                    .into_iter()
483                    .map(|(emoji, users)| {
484                        let handles = users
485                            .into_iter()
486                            .map(crate::messaging::user_handle::UserHandle::from)
487                            .collect();
488                        (emoji, handles)
489                    })
490                    .collect(),
491            ),
492            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get reactions: {}", e)),
493            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
494        }
495    }
496
497    /// Update an existing message
498    pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
499        // For simplicity, delete and re-insert the message
500        // In production, you might want to do selective updates
501        self.store_message(message).await
502    }
503
504    /// Get messages for a channel
505    pub async fn get_channel_messages(
506        &self,
507        channel_id: ChannelId,
508        limit: usize,
509        before: Option<DateTime<Utc>>,
510    ) -> Result<Vec<RichMessage>> {
511        let conn = self.pool.get().await?;
512        let chan_id = channel_id.to_string();
513        let before_ts = before.map(|dt| dt.timestamp_millis()).unwrap_or(i64::MAX);
514
515        let result = conn
516            .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
517                let mut stmt = conn.prepare(
518                    "SELECT id FROM messages 
519                 WHERE channel_id = ?1 AND created_at < ?2 AND deleted_at IS NULL
520                 ORDER BY created_at DESC LIMIT ?3",
521                )?;
522
523                let rows = stmt.query_map(params![chan_id, before_ts, limit as i64], |row| {
524                    let id_str: String = row.get("id")?;
525                    Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
526                        rusqlite::Error::InvalidColumnType(
527                            0,
528                            "id".to_string(),
529                            rusqlite::types::Type::Text,
530                        )
531                    })
532                })?;
533
534                let mut message_ids = Vec::new();
535                for row in rows {
536                    message_ids.push(row?);
537                }
538                Ok(message_ids)
539            })
540            .await;
541
542        match result {
543            Ok(Ok(message_ids)) => {
544                let mut messages = Vec::new();
545                for id in message_ids {
546                    if let Ok(msg) = self.get_message(id).await {
547                        messages.push(msg);
548                    }
549                }
550                Ok(messages)
551            }
552            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get channel messages: {}", e)),
553            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
554        }
555    }
556
557    /// Mark a message as read
558    pub async fn mark_as_read(&self, message_id: MessageId, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
559        let conn = self.pool.get().await?;
560        let msg_id = message_id.to_string();
561        let user_str = user.as_str().to_string();
562
563        let result = conn
564            .interact(move |conn| -> Result<(), rusqlite::Error> {
565                conn.execute(
566                    "INSERT OR REPLACE INTO read_receipts (message_id, user_id, read_at) 
567                 VALUES (?1, ?2, ?3)",
568                    params![msg_id, user_str, chrono::Utc::now().timestamp_millis()],
569                )?;
570                Ok(())
571            })
572            .await;
573
574        match result {
575            Ok(Ok(())) => {
576                debug!("Message {} marked as read by {}", message_id, user);
577                Ok(())
578            }
579            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to mark message as read: {}", e)),
580            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
581        }
582    }
583
584    /// Search messages by content
585    pub async fn search_messages(
586        &self,
587        query: &str,
588        channel_id: Option<ChannelId>,
589        limit: usize,
590    ) -> Result<Vec<RichMessage>> {
591        let conn = self.pool.get().await?;
592        let search_query = format!("%{}%", query);
593        let chan_filter = channel_id.map(|id| id.to_string());
594
595        let result = conn
596            .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
597                let (sql, params): (String, Vec<Box<dyn rusqlite::ToSql + Send>>) =
598                    if let Some(channel) = chan_filter {
599                        (
600                            "SELECT id FROM messages 
601                     WHERE content LIKE ?1 AND channel_id = ?2 AND deleted_at IS NULL
602                     ORDER BY created_at DESC LIMIT ?3"
603                                .to_string(),
604                            vec![
605                                Box::new(search_query),
606                                Box::new(channel),
607                                Box::new(limit as i64),
608                            ],
609                        )
610                    } else {
611                        (
612                            "SELECT id FROM messages 
613                     WHERE content LIKE ?1 AND deleted_at IS NULL
614                     ORDER BY created_at DESC LIMIT ?2"
615                                .to_string(),
616                            vec![Box::new(search_query), Box::new(limit as i64)],
617                        )
618                    };
619
620                let mut stmt = conn.prepare(&sql)?;
621                let param_refs: Vec<&dyn rusqlite::ToSql> = params
622                    .iter()
623                    .map(|p| p.as_ref() as &dyn rusqlite::ToSql)
624                    .collect();
625
626                let rows = stmt.query_map(param_refs.as_slice(), |row| {
627                    let id_str: String = row.get("id")?;
628                    Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
629                        rusqlite::Error::InvalidColumnType(
630                            0,
631                            "id".to_string(),
632                            rusqlite::types::Type::Text,
633                        )
634                    })
635                })?;
636
637                let mut message_ids = Vec::new();
638                for row in rows {
639                    message_ids.push(row?);
640                }
641                Ok(message_ids)
642            })
643            .await;
644
645        match result {
646            Ok(Ok(message_ids)) => {
647                let mut messages = Vec::new();
648                for id in message_ids {
649                    if let Ok(msg) = self.get_message(id).await {
650                        messages.push(msg);
651                    }
652                }
653                Ok(messages)
654            }
655            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to search messages: {}", e)),
656            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
657        }
658    }
659
660    /// Add a reaction to a message
661    pub async fn add_reaction(
662        &self,
663        message_id: MessageId,
664        emoji: String,
665        user: crate::messaging::user_handle::UserHandle,
666    ) -> Result<()> {
667        let conn = self.pool.get().await?;
668        let msg_id = message_id.to_string();
669        let user_str = user.as_str().to_string();
670        let emoji_clone = emoji.clone();
671
672        let result = conn
673            .interact(move |conn| -> Result<(), rusqlite::Error> {
674                conn.execute(
675                    "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at) 
676                 VALUES (?1, ?2, ?3, ?4)",
677                    params![
678                        msg_id,
679                        emoji_clone,
680                        user_str,
681                        chrono::Utc::now().timestamp_millis()
682                    ],
683                )?;
684                Ok(())
685            })
686            .await;
687
688        match result {
689            Ok(Ok(())) => {
690                debug!(
691                    "Reaction {} added to message {} by {}",
692                    emoji, message_id, user
693                );
694                Ok(())
695            }
696            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to add reaction: {}", e)),
697            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
698        }
699    }
700
701    /// Remove a reaction from a message
702    pub async fn remove_reaction(
703        &self,
704        message_id: MessageId,
705        emoji: String,
706        user: crate::messaging::user_handle::UserHandle,
707    ) -> Result<()> {
708        let conn = self.pool.get().await?;
709        let msg_id = message_id.to_string();
710        let user_str = user.as_str().to_string();
711        let emoji_clone = emoji.clone();
712
713        let result = conn
714            .interact(move |conn| -> Result<(), rusqlite::Error> {
715                conn.execute(
716                    "DELETE FROM reactions 
717                 WHERE message_id = ?1 AND emoji = ?2 AND user_id = ?3",
718                    params![msg_id, emoji_clone, user_str],
719                )?;
720                Ok(())
721            })
722            .await;
723
724        match result {
725            Ok(Ok(())) => {
726                debug!(
727                    "Reaction {} removed from message {} by {}",
728                    emoji, message_id, user
729                );
730                Ok(())
731            }
732            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to remove reaction: {}", e)),
733            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
734        }
735    }
736
737    /// Get thread messages
738    pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
739        let conn = self.pool.get().await?;
740        let thread_str = thread_id.to_string();
741
742        let result = conn
743            .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
744                let mut stmt = conn.prepare(
745                    "SELECT id FROM messages 
746                 WHERE thread_id = ?1 AND deleted_at IS NULL
747                 ORDER BY created_at ASC",
748                )?;
749
750                let rows = stmt.query_map(params![thread_str], |row| {
751                    let id_str: String = row.get("id")?;
752                    Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
753                        rusqlite::Error::InvalidColumnType(
754                            0,
755                            "id".to_string(),
756                            rusqlite::types::Type::Text,
757                        )
758                    })
759                })?;
760
761                let mut message_ids = Vec::new();
762                for row in rows {
763                    message_ids.push(row?);
764                }
765                Ok(message_ids)
766            })
767            .await;
768
769        match result {
770            Ok(Ok(message_ids)) => {
771                let mut messages = Vec::new();
772                for id in message_ids {
773                    if let Ok(msg) = self.get_message(id).await {
774                        messages.push(msg);
775                    }
776                }
777                Ok(messages)
778            }
779            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get thread messages: {}", e)),
780            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
781        }
782    }
783
784    /// Get database statistics
785    pub async fn get_stats(&self) -> Result<DatabaseStats> {
786        let conn = self.pool.get().await?;
787
788        let result = conn
789            .interact(|conn| -> Result<DatabaseStats, rusqlite::Error> {
790                let mut total_messages: i64 = 0;
791                let mut total_attachments: i64 = 0;
792                let mut total_reactions: i64 = 0;
793                conn.query_row("SELECT COUNT(*) FROM messages", [], |row| {
794                    total_messages = row.get(0)?;
795                    Ok(())
796                })?;
797
798                conn.query_row("SELECT COUNT(*) FROM attachments", [], |row| {
799                    total_attachments = row.get(0)?;
800                    Ok(())
801                })?;
802
803                conn.query_row("SELECT COUNT(*) FROM reactions", [], |row| {
804                    total_reactions = row.get(0)?;
805                    Ok(())
806                })?;
807
808                // Get database page count and page size to calculate size
809                let page_count: i64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
810                let page_size: i64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
811                let db_size = page_count * page_size;
812
813                Ok(DatabaseStats {
814                    total_messages: total_messages as u64,
815                    total_attachments: total_attachments as u64,
816                    total_reactions: total_reactions as u64,
817                    database_size_bytes: db_size as u64,
818                })
819            })
820            .await;
821
822        match result {
823            Ok(Ok(stats)) => Ok(stats),
824            Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get database stats: {}", e)),
825            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
826        }
827    }
828
829    /// Clean up ephemeral messages
830    pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
831        let conn = self.pool.get().await?;
832        let cutoff_time = chrono::Utc::now().timestamp_millis() - (ttl_seconds * 1000);
833
834        let result = conn
835            .interact(move |conn| -> Result<usize, rusqlite::Error> {
836                let changes = conn.execute(
837                    "DELETE FROM messages 
838                 WHERE ephemeral = 1 AND created_at < ?1",
839                    params![cutoff_time],
840                )?;
841                Ok(changes)
842            })
843            .await;
844
845        match result {
846            Ok(Ok(count)) => {
847                info!("Cleaned up {} ephemeral messages", count);
848                Ok(count)
849            }
850            Ok(Err(e)) => Err(anyhow::anyhow!(
851                "Failed to cleanup ephemeral messages: {}",
852                e
853            )),
854            Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
855        }
856    }
857}
858
859/// Database statistics
860#[derive(Debug, Clone)]
861pub struct DatabaseStats {
862    pub total_messages: u64,
863    pub total_attachments: u64,
864    pub total_reactions: u64,
865    pub database_size_bytes: u64,
866}
867
868// Type alias for compatibility
869pub type MessageStore = DatabaseMessageStore;