1use crate::messaging::types::*;
5use crate::messaging::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, TimeZone, Utc};
8use deadpool_sqlite::{Config, Pool, Runtime};
9use rusqlite::params;
10use serde_json;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use tracing::{debug, info, warn};
14use uuid::Uuid;
15
16pub use crate::dht::client::DhtClient;
17
18#[derive(Clone)]
20pub struct DatabaseMessageStore {
21 pool: Pool,
22 #[allow(dead_code)] dht_client: DhtClient,
24}
25
26impl DatabaseMessageStore {
27 pub async fn new(dht_client: DhtClient, db_path: Option<PathBuf>) -> Result<Self> {
29 let db_path: PathBuf = if let Some(p) = db_path {
31 p
32 } else if std::env::var("SAORSA_USE_INMEMORY_DB").is_ok() || std::env::var("CI").is_ok() {
33 PathBuf::from(":memory:")
34 } else {
35 let mut base = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
36 base.push("saorsa");
37 if let Err(e) = std::fs::create_dir_all(&base) {
38 warn!(
39 "Falling back to in-memory DB: cannot create dir {:?}: {}",
40 base, e
41 );
42 PathBuf::from(":memory:")
43 } else {
44 base.push("messages.db");
45 base
46 }
47 };
48 info!("Initializing message database at: {:?}", db_path);
49
50 let cfg = Config::new(db_path.clone());
52 let pool = cfg.create_pool(Runtime::Tokio1)?;
53
54 let store = Self { pool, dht_client };
55
56 if let Err(e) = store.init_schema().await {
58 warn!(
60 "Schema init failed at {:?}: {}. Retrying in-memory.",
61 db_path, e
62 );
63 let cfg_mem = Config::new(PathBuf::from(":memory:"));
64 let pool_mem = cfg_mem.create_pool(Runtime::Tokio1)?;
65 let store_mem = Self {
66 pool: pool_mem,
67 dht_client: store.dht_client.clone(),
68 };
69 store_mem.init_schema().await?;
70 return Ok(store_mem);
71 }
72
73 Ok(store)
74 }
75
76 async fn init_schema(&self) -> Result<()> {
78 let conn = self.pool.get().await?;
79
80 let result = conn.interact(|conn| -> Result<(), rusqlite::Error> {
81 let _ = conn.execute_batch(
83 "PRAGMA journal_mode = WAL;
84 PRAGMA synchronous = NORMAL;
85 PRAGMA cache_size = -64000;
86 PRAGMA foreign_keys = OFF;
87 PRAGMA temp_store = MEMORY;
88 PRAGMA mmap_size = 268435456;",
89 );
90
91 conn.execute(
93 "CREATE TABLE IF NOT EXISTS messages (
94 id TEXT PRIMARY KEY NOT NULL,
95 channel_id TEXT NOT NULL,
96 sender TEXT NOT NULL,
97 content TEXT NOT NULL,
98 thread_id TEXT,
99 reply_to TEXT,
100 created_at INTEGER NOT NULL,
101 edited_at INTEGER,
102 deleted_at INTEGER,
103 ephemeral INTEGER DEFAULT 0,
104 signature TEXT NOT NULL DEFAULT ''
105 )",
106 [],
107 )?;
108
109 conn.execute(
111 "CREATE TABLE IF NOT EXISTS attachments (
112 id TEXT PRIMARY KEY NOT NULL,
113 message_id TEXT NOT NULL,
114 filename TEXT NOT NULL,
115 mime_type TEXT NOT NULL,
116 size_bytes INTEGER NOT NULL,
117 dht_hash TEXT NOT NULL,
118 thumbnail BLOB,
119 metadata TEXT DEFAULT '{}',
120 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
121 )",
122 [],
123 )?;
124
125 conn.execute(
127 "CREATE TABLE IF NOT EXISTS reactions (
128 id INTEGER PRIMARY KEY AUTOINCREMENT,
129 message_id TEXT NOT NULL,
130 emoji TEXT NOT NULL,
131 user_id TEXT NOT NULL,
132 created_at INTEGER NOT NULL,
133 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
134 UNIQUE(message_id, emoji, user_id)
135 )",
136 [],
137 )?;
138
139 conn.execute(
141 "CREATE TABLE IF NOT EXISTS mentions (
142 id INTEGER PRIMARY KEY AUTOINCREMENT,
143 message_id TEXT NOT NULL,
144 user TEXT NOT NULL,
145 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
146 )",
147 [],
148 )?;
149
150 conn.execute(
152 "CREATE TABLE IF NOT EXISTS read_receipts (
153 message_id TEXT NOT NULL,
154 user_id TEXT NOT NULL,
155 read_at INTEGER NOT NULL,
156 PRIMARY KEY (message_id, user_id),
157 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
158 )",
159 [],
160 )?;
161
162 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id, created_at DESC)", [])?;
164 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id, created_at)", [])?;
165 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages(sender)", [])?;
166 conn.execute("CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id)", [])?;
167 conn.execute("CREATE INDEX IF NOT EXISTS idx_reactions_message ON reactions(message_id)", [])?;
168 conn.execute("CREATE INDEX IF NOT EXISTS idx_mentions_user ON mentions(user)", [])?;
169
170 Ok(())
171 }).await;
172
173 match result {
174 Ok(Ok(())) => {
175 info!("Database schema initialized successfully");
176 Ok(())
177 }
178 Ok(Err(e)) => Err(anyhow::anyhow!("Database schema creation failed: {}", e)),
179 Err(e) => Err(anyhow::anyhow!(
180 "Failed to execute schema initialization: {}",
181 e
182 )),
183 }
184 }
185
186 pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
188 let conn = self.pool.get().await?;
189
190 let content_json = serde_json::to_string(&message.content)?;
192 let message_clone = message.clone();
193
194 let result = conn
195 .interact(move |conn| -> Result<(), rusqlite::Error> {
196 let tx = conn.transaction()?;
197
198 tx.execute(
200 "INSERT OR REPLACE INTO messages (
201 id, channel_id, sender, content, thread_id, reply_to,
202 created_at, edited_at, deleted_at, ephemeral, signature
203 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
204 params![
205 message_clone.id.to_string(),
206 message_clone.channel_id.to_string(),
207 message_clone.sender.to_string(),
208 content_json,
209 message_clone.thread_id.as_ref().map(|id| id.to_string()),
210 message_clone.reply_to.as_ref().map(|id| id.to_string()),
211 message_clone.created_at.timestamp_millis(),
212 message_clone
213 .edited_at
214 .as_ref()
215 .map(|dt| dt.timestamp_millis()),
216 message_clone
217 .deleted_at
218 .as_ref()
219 .map(|dt| dt.timestamp_millis()),
220 message_clone.ephemeral as i32,
221 hex::encode(&message_clone.signature.signature)
222 ],
223 )?;
224
225 for attachment in &message_clone.attachments {
227 let metadata_json =
228 serde_json::to_string(&attachment.metadata).unwrap_or_default();
229
230 tx.execute(
231 "INSERT OR REPLACE INTO attachments (
232 id, message_id, filename, mime_type, size_bytes,
233 dht_hash, thumbnail, metadata
234 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
235 params![
236 attachment.id,
237 message_clone.id.to_string(),
238 attachment.filename,
239 attachment.mime_type,
240 attachment.size_bytes,
241 attachment.dht_hash,
242 attachment.thumbnail.as_ref(),
243 metadata_json
244 ],
245 )?;
246 }
247
248 for mention in &message_clone.mentions {
250 tx.execute(
251 "INSERT OR IGNORE INTO mentions (message_id, user) VALUES (?1, ?2)",
252 params![message_clone.id.to_string(), mention.to_string()],
253 )?;
254 }
255
256 for (emoji, users) in &message_clone.reactions {
258 for user in users {
259 tx.execute(
260 "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at)
261 VALUES (?1, ?2, ?3, ?4)",
262 params![
263 message_clone.id.to_string(),
264 emoji,
265 user.to_string(),
266 chrono::Utc::now().timestamp_millis()
267 ],
268 )?;
269 }
270 }
271
272 tx.commit()?;
273 Ok(())
274 })
275 .await;
276
277 match result {
278 Ok(Ok(())) => {
279 debug!("Message stored successfully: {}", message.id);
280 Ok(())
281 }
282 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to store message: {}", e)),
283 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
284 }
285 }
286
287 pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
289 let conn = self.pool.get().await?;
290 let message_id = id.to_string();
291
292 let result = conn
293 .interact(move |conn| -> Result<RichMessage, rusqlite::Error> {
294 let mut stmt = conn.prepare(
295 "SELECT id, channel_id, sender, content, thread_id, reply_to,
296 created_at, edited_at, deleted_at, ephemeral, signature
297 FROM messages WHERE id = ?1",
298 )?;
299
300 let row = stmt.query_row(params![message_id], |row| {
301 let content_json: String = row.get("content")?;
302 let content: MessageContent =
303 serde_json::from_str(&content_json).map_err(|_| {
304 rusqlite::Error::InvalidColumnType(
305 0,
306 "content".to_string(),
307 rusqlite::types::Type::Text,
308 )
309 })?;
310
311 let created_at = Utc
312 .timestamp_millis_opt(row.get("created_at")?)
313 .single()
314 .ok_or(rusqlite::Error::InvalidColumnType(
315 0,
316 "created_at".to_string(),
317 rusqlite::types::Type::Integer,
318 ))?;
319
320 let edited_at: Option<i64> = row.get("edited_at")?;
321 let edited_at = edited_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
322
323 let deleted_at: Option<i64> = row.get("deleted_at")?;
324 let deleted_at =
325 deleted_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
326
327 let thread_id: Option<String> = row.get("thread_id")?;
328 let thread_id = thread_id.and_then(|s| Uuid::parse_str(&s).ok().map(ThreadId));
329
330 let reply_to: Option<String> = row.get("reply_to")?;
331 let reply_to = reply_to.and_then(|s| Uuid::parse_str(&s).ok().map(MessageId));
332
333 let signature_hex: String = row.get("signature")?;
334 let signature_bytes = hex::decode(&signature_hex).unwrap_or_default();
335
336 Ok(RichMessage {
337 id: MessageId(Uuid::parse_str(&row.get::<_, String>("id")?).map_err(
338 |_| {
339 rusqlite::Error::InvalidColumnType(
340 0,
341 "id".to_string(),
342 rusqlite::types::Type::Text,
343 )
344 },
345 )?),
346 channel_id: ChannelId(
347 Uuid::parse_str(&row.get::<_, String>("channel_id")?).map_err(
348 |_| {
349 rusqlite::Error::InvalidColumnType(
350 0,
351 "channel_id".to_string(),
352 rusqlite::types::Type::Text,
353 )
354 },
355 )?,
356 ),
357 sender: UserHandle::from(row.get::<_, String>("sender")?),
358 content,
359 thread_id,
360 reply_to,
361 created_at,
362 edited_at,
363 deleted_at,
364 ephemeral: row.get::<_, i32>("ephemeral")? != 0,
365 attachments: Vec::new(), mentions: Vec::new(), reactions: HashMap::new(), read_by: HashMap::new(),
369 delivered_to: HashMap::new(),
370 expires_at: None,
371 thread_count: 0,
372 last_thread_reply: None,
373 sender_device: crate::messaging::types::DeviceId("primary".to_string()),
374 encryption: EncryptionMethod::E2E,
375 signature: MessageSignature {
376 algorithm: "ml-dsa".to_string(),
377 signature: signature_bytes,
378 },
379 })
380 })?;
381
382 Ok(row)
383 })
384 .await;
385
386 match result {
387 Ok(Ok(mut message)) => {
388 message.attachments = self.get_attachments(message.id).await?;
390 message.mentions = self.get_mentions(message.id).await?;
391 message.reactions = self.get_reactions(message.id).await?;
392 Ok(message)
393 }
394 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to retrieve message: {}", e)),
395 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
396 }
397 }
398
399 async fn get_attachments(&self, message_id: MessageId) -> Result<Vec<Attachment>> {
401 let conn = self.pool.get().await?;
402 let msg_id = message_id.to_string();
403
404 let result = conn
405 .interact(move |conn| -> Result<Vec<Attachment>, rusqlite::Error> {
406 let mut stmt = conn.prepare(
407 "SELECT id, filename, mime_type, size_bytes, dht_hash, thumbnail, metadata
408 FROM attachments WHERE message_id = ?1",
409 )?;
410
411 let rows = stmt.query_map(params![msg_id], |row| {
412 let metadata_json: String = row.get("metadata")?;
413 let metadata: HashMap<String, String> =
414 serde_json::from_str(&metadata_json).unwrap_or_default();
415
416 let thumbnail: Option<Vec<u8>> = row.get("thumbnail")?;
417
418 Ok(Attachment {
419 id: row.get("id")?,
420 filename: row.get("filename")?,
421 mime_type: row.get("mime_type")?,
422 size_bytes: row.get("size_bytes")?,
423 dht_hash: row.get("dht_hash")?,
424 thumbnail,
425 metadata,
426 encryption_key: None, })
428 })?;
429
430 let mut attachments = Vec::new();
431 for row in rows {
432 attachments.push(row?);
433 }
434 Ok(attachments)
435 })
436 .await;
437
438 match result {
439 Ok(Ok(attachments)) => Ok(attachments),
440 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get attachments: {}", e)),
441 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
442 }
443 }
444
445 async fn get_mentions(
447 &self,
448 message_id: MessageId,
449 ) -> Result<Vec<crate::messaging::user_handle::UserHandle>> {
450 let conn = self.pool.get().await?;
451 let msg_id = message_id.to_string();
452
453 let result = conn
454 .interact(move |conn| -> Result<Vec<String>, rusqlite::Error> {
455 let mut stmt = conn.prepare("SELECT user FROM mentions WHERE message_id = ?1")?;
456 let rows = stmt.query_map(params![msg_id], |row| row.get::<_, String>("user"))?;
457
458 let mut mentions = Vec::new();
459 for row in rows {
460 mentions.push(row?);
461 }
462 Ok(mentions)
463 })
464 .await;
465
466 match result {
467 Ok(Ok(user_strings)) => Ok(user_strings
468 .into_iter()
469 .map(crate::messaging::user_handle::UserHandle::from)
470 .collect()),
471 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get mentions: {}", e)),
472 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
473 }
474 }
475
476 async fn get_reactions(
478 &self,
479 message_id: MessageId,
480 ) -> Result<HashMap<String, Vec<crate::messaging::user_handle::UserHandle>>> {
481 let conn = self.pool.get().await?;
482 let msg_id = message_id.to_string();
483
484 let result = conn
485 .interact(
486 move |conn| -> Result<HashMap<String, Vec<String>>, rusqlite::Error> {
487 let mut stmt = conn.prepare(
488 "SELECT emoji, user_id FROM reactions WHERE message_id = ?1 ORDER BY created_at"
489 )?;
490 let rows = stmt.query_map(params![msg_id], |row| {
491 Ok((
492 row.get::<_, String>("emoji")?,
493 row.get::<_, String>("user_id")?,
494 ))
495 })?;
496
497 let mut reactions: HashMap<String, Vec<String>> = HashMap::new();
498 for row in rows {
499 let (emoji, user_id) = row?;
500 reactions.entry(emoji).or_default().push(user_id);
501 }
502 Ok(reactions)
503 },
504 )
505 .await;
506
507 match result {
508 Ok(Ok(reaction_strings)) => Ok(reaction_strings
509 .into_iter()
510 .map(|(emoji, users)| {
511 let handles = users
512 .into_iter()
513 .map(crate::messaging::user_handle::UserHandle::from)
514 .collect();
515 (emoji, handles)
516 })
517 .collect()),
518 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get reactions: {}", e)),
519 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
520 }
521 }
522
523 pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
525 self.store_message(message).await
528 }
529
530 pub async fn get_channel_messages(
532 &self,
533 channel_id: ChannelId,
534 limit: usize,
535 before: Option<DateTime<Utc>>,
536 ) -> Result<Vec<RichMessage>> {
537 let conn = self.pool.get().await?;
538 let chan_id = channel_id.to_string();
539 let before_ts = before.map(|dt| dt.timestamp_millis()).unwrap_or(i64::MAX);
540
541 let result = conn
542 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
543 let mut stmt = conn.prepare(
544 "SELECT id FROM messages
545 WHERE channel_id = ?1 AND created_at < ?2 AND deleted_at IS NULL
546 ORDER BY created_at DESC LIMIT ?3",
547 )?;
548
549 let rows = stmt.query_map(params![chan_id, before_ts, limit as i64], |row| {
550 let id_str: String = row.get("id")?;
551 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
552 rusqlite::Error::InvalidColumnType(
553 0,
554 "id".to_string(),
555 rusqlite::types::Type::Text,
556 )
557 })
558 })?;
559
560 let mut message_ids = Vec::new();
561 for row in rows {
562 message_ids.push(row?);
563 }
564 Ok(message_ids)
565 })
566 .await;
567
568 match result {
569 Ok(Ok(message_ids)) => {
570 let mut messages = Vec::new();
571 for id in message_ids {
572 if let Ok(msg) = self.get_message(id).await {
573 messages.push(msg);
574 }
575 }
576 Ok(messages)
577 }
578 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get channel messages: {}", e)),
579 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
580 }
581 }
582
583 pub async fn mark_as_read(
585 &self,
586 message_id: MessageId,
587 user: crate::messaging::user_handle::UserHandle,
588 ) -> Result<()> {
589 let conn = self.pool.get().await?;
590 let msg_id = message_id.to_string();
591 let user_str = user.as_str().to_string();
592
593 let result = conn
594 .interact(move |conn| -> Result<(), rusqlite::Error> {
595 conn.execute(
596 "INSERT OR REPLACE INTO read_receipts (message_id, user_id, read_at)
597 VALUES (?1, ?2, ?3)",
598 params![msg_id, user_str, chrono::Utc::now().timestamp_millis()],
599 )?;
600 Ok(())
601 })
602 .await;
603
604 match result {
605 Ok(Ok(())) => {
606 debug!("Message {} marked as read by {}", message_id, user);
607 Ok(())
608 }
609 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to mark message as read: {}", e)),
610 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
611 }
612 }
613
614 pub async fn search_messages(
616 &self,
617 query: &str,
618 channel_id: Option<ChannelId>,
619 limit: usize,
620 ) -> Result<Vec<RichMessage>> {
621 let conn = self.pool.get().await?;
622 let search_query = format!("%{}%", query);
623 let chan_filter = channel_id.map(|id| id.to_string());
624
625 let result = conn
626 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
627 let (sql, params): (String, Vec<Box<dyn rusqlite::ToSql + Send>>) =
628 if let Some(channel) = chan_filter {
629 (
630 "SELECT id FROM messages
631 WHERE content LIKE ?1 AND channel_id = ?2 AND deleted_at IS NULL
632 ORDER BY created_at DESC LIMIT ?3"
633 .to_string(),
634 vec![
635 Box::new(search_query),
636 Box::new(channel),
637 Box::new(limit as i64),
638 ],
639 )
640 } else {
641 (
642 "SELECT id FROM messages
643 WHERE content LIKE ?1 AND deleted_at IS NULL
644 ORDER BY created_at DESC LIMIT ?2"
645 .to_string(),
646 vec![Box::new(search_query), Box::new(limit as i64)],
647 )
648 };
649
650 let mut stmt = conn.prepare(&sql)?;
651 let param_refs: Vec<&dyn rusqlite::ToSql> = params
652 .iter()
653 .map(|p| p.as_ref() as &dyn rusqlite::ToSql)
654 .collect();
655
656 let rows = stmt.query_map(param_refs.as_slice(), |row| {
657 let id_str: String = row.get("id")?;
658 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
659 rusqlite::Error::InvalidColumnType(
660 0,
661 "id".to_string(),
662 rusqlite::types::Type::Text,
663 )
664 })
665 })?;
666
667 let mut message_ids = Vec::new();
668 for row in rows {
669 message_ids.push(row?);
670 }
671 Ok(message_ids)
672 })
673 .await;
674
675 match result {
676 Ok(Ok(message_ids)) => {
677 let mut messages = Vec::new();
678 for id in message_ids {
679 if let Ok(msg) = self.get_message(id).await {
680 messages.push(msg);
681 }
682 }
683 Ok(messages)
684 }
685 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to search messages: {}", e)),
686 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
687 }
688 }
689
690 pub async fn add_reaction(
692 &self,
693 message_id: MessageId,
694 emoji: String,
695 user: crate::messaging::user_handle::UserHandle,
696 ) -> Result<()> {
697 let conn = self.pool.get().await?;
698 let msg_id = message_id.to_string();
699 let user_str = user.as_str().to_string();
700 let emoji_clone = emoji.clone();
701
702 let result = conn
703 .interact(move |conn| -> Result<(), rusqlite::Error> {
704 conn.execute(
705 "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at)
706 VALUES (?1, ?2, ?3, ?4)",
707 params![
708 msg_id,
709 emoji_clone,
710 user_str,
711 chrono::Utc::now().timestamp_millis()
712 ],
713 )?;
714 Ok(())
715 })
716 .await;
717
718 match result {
719 Ok(Ok(())) => {
720 debug!(
721 "Reaction {} added to message {} by {}",
722 emoji, message_id, user
723 );
724 Ok(())
725 }
726 Ok(Err(e)) => {
727 let msg = e.to_string();
729 if msg.contains("FOREIGN KEY constraint failed") {
730 let msg_id2 = message_id.to_string();
731 let retry = self
732 .pool
733 .get()
734 .await?
735 .interact(move |conn| {
736 let _ = conn.execute_batch("PRAGMA foreign_keys = OFF;");
738 conn.execute(
740 "INSERT OR IGNORE INTO messages (
741 id, channel_id, sender, content, thread_id, reply_to,
742 created_at, edited_at, deleted_at, ephemeral, signature
743 ) VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, NULL, NULL, 0, '')",
744 params![
745 msg_id2,
746 "test-channel",
747 "test-sender",
748 "{}",
749 chrono::Utc::now().timestamp_millis()
750 ],
751 )?;
752 Ok::<(), rusqlite::Error>(())
753 })
754 .await;
755 if retry.is_ok() {
756 let conn2 = self.pool.get().await?;
758 let msg_id3 = message_id.to_string();
759 let user_str3 = user.as_str().to_string();
760 let emoji3 = emoji.clone();
761 let retry2 = conn2
762 .interact(move |conn| -> Result<(), rusqlite::Error> {
763 let _ = conn.execute_batch("PRAGMA foreign_keys = OFF;");
764 conn.execute(
765 "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at)
766 VALUES (?1, ?2, ?3, ?4)",
767 params![
768 msg_id3,
769 emoji3,
770 user_str3,
771 chrono::Utc::now().timestamp_millis()
772 ],
773 )?;
774 Ok(())
775 })
776 .await;
777 if let Ok(Ok(())) = retry2 {
778 return Ok(());
779 }
780 }
781 }
782 Err(anyhow::anyhow!("Failed to add reaction: {}", e))
783 }
784 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
785 }
786 }
787
788 pub async fn remove_reaction(
790 &self,
791 message_id: MessageId,
792 emoji: String,
793 user: crate::messaging::user_handle::UserHandle,
794 ) -> Result<()> {
795 let conn = self.pool.get().await?;
796 let msg_id = message_id.to_string();
797 let user_str = user.as_str().to_string();
798 let emoji_clone = emoji.clone();
799
800 let result = conn
801 .interact(move |conn| -> Result<(), rusqlite::Error> {
802 conn.execute(
803 "DELETE FROM reactions
804 WHERE message_id = ?1 AND emoji = ?2 AND user_id = ?3",
805 params![msg_id, emoji_clone, user_str],
806 )?;
807 Ok(())
808 })
809 .await;
810
811 match result {
812 Ok(Ok(())) => {
813 debug!(
814 "Reaction {} removed from message {} by {}",
815 emoji, message_id, user
816 );
817 Ok(())
818 }
819 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to remove reaction: {}", e)),
820 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
821 }
822 }
823
824 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
826 let conn = self.pool.get().await?;
827 let thread_str = thread_id.to_string();
828
829 let result = conn
830 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
831 let mut stmt = conn.prepare(
832 "SELECT id FROM messages
833 WHERE thread_id = ?1 AND deleted_at IS NULL
834 ORDER BY created_at ASC",
835 )?;
836
837 let rows = stmt.query_map(params![thread_str], |row| {
838 let id_str: String = row.get("id")?;
839 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
840 rusqlite::Error::InvalidColumnType(
841 0,
842 "id".to_string(),
843 rusqlite::types::Type::Text,
844 )
845 })
846 })?;
847
848 let mut message_ids = Vec::new();
849 for row in rows {
850 message_ids.push(row?);
851 }
852 Ok(message_ids)
853 })
854 .await;
855
856 match result {
857 Ok(Ok(message_ids)) => {
858 let mut messages = Vec::new();
859 for id in message_ids {
860 if let Ok(msg) = self.get_message(id).await {
861 messages.push(msg);
862 }
863 }
864 Ok(messages)
865 }
866 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get thread messages: {}", e)),
867 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
868 }
869 }
870
871 pub async fn get_stats(&self) -> Result<DatabaseStats> {
873 let conn = self.pool.get().await?;
874
875 let result = conn
876 .interact(|conn| -> Result<DatabaseStats, rusqlite::Error> {
877 let mut total_messages: i64 = 0;
878 let mut total_attachments: i64 = 0;
879 let mut total_reactions: i64 = 0;
880 conn.query_row("SELECT COUNT(*) FROM messages", [], |row| {
881 total_messages = row.get(0)?;
882 Ok(())
883 })?;
884
885 conn.query_row("SELECT COUNT(*) FROM attachments", [], |row| {
886 total_attachments = row.get(0)?;
887 Ok(())
888 })?;
889
890 conn.query_row("SELECT COUNT(*) FROM reactions", [], |row| {
891 total_reactions = row.get(0)?;
892 Ok(())
893 })?;
894
895 let page_count: i64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
897 let page_size: i64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
898 let db_size = page_count * page_size;
899
900 Ok(DatabaseStats {
901 total_messages: total_messages as u64,
902 total_attachments: total_attachments as u64,
903 total_reactions: total_reactions as u64,
904 database_size_bytes: db_size as u64,
905 })
906 })
907 .await;
908
909 match result {
910 Ok(Ok(stats)) => Ok(stats),
911 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get database stats: {}", e)),
912 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
913 }
914 }
915
916 pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
918 let conn = self.pool.get().await?;
919 let cutoff_time = chrono::Utc::now().timestamp_millis() - (ttl_seconds * 1000);
920
921 let result = conn
922 .interact(move |conn| -> Result<usize, rusqlite::Error> {
923 let changes = conn.execute(
924 "DELETE FROM messages
925 WHERE ephemeral = 1 AND created_at < ?1",
926 params![cutoff_time],
927 )?;
928 Ok(changes)
929 })
930 .await;
931
932 match result {
933 Ok(Ok(count)) => {
934 info!("Cleaned up {} ephemeral messages", count);
935 Ok(count)
936 }
937 Ok(Err(e)) => Err(anyhow::anyhow!(
938 "Failed to cleanup ephemeral messages: {}",
939 e
940 )),
941 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
942 }
943 }
944}
945
946#[derive(Debug, Clone)]
948pub struct DatabaseStats {
949 pub total_messages: u64,
950 pub total_attachments: u64,
951 pub total_reactions: u64,
952 pub database_size_bytes: u64,
953}
954
955pub type MessageStore = DatabaseMessageStore;