1use crate::messaging::types::*;
5use crate::messaging::user_handle::UserHandle;
6use anyhow::Result;
7use chrono::{DateTime, TimeZone, Utc};
8use deadpool_sqlite::{Config, Pool, Runtime};
9use rusqlite::params;
10use serde_json;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use tracing::{debug, info};
14use uuid::Uuid;
15
16pub use crate::dht::client::DhtClient;
17
18#[derive(Clone)]
20pub struct DatabaseMessageStore {
21 pool: Pool,
22 #[allow(dead_code)] dht_client: DhtClient,
24}
25
26impl DatabaseMessageStore {
27 pub async fn new(dht_client: DhtClient, db_path: Option<PathBuf>) -> Result<Self> {
29 let db_path = db_path.unwrap_or_else(|| {
30 let mut path = dirs::data_dir().unwrap_or_else(|| PathBuf::from("."));
31 path.push("saorsa");
32 std::fs::create_dir_all(&path).unwrap_or(());
33 path.push("messages.db");
34 path
35 });
36
37 info!("Initializing message database at: {:?}", db_path);
38
39 let cfg = Config::new(db_path);
41 let pool = cfg.create_pool(Runtime::Tokio1)?;
42
43 let store = Self { pool, dht_client };
44
45 store.init_schema().await?;
47
48 Ok(store)
49 }
50
51 async fn init_schema(&self) -> Result<()> {
53 let conn = self.pool.get().await?;
54
55 let result = conn.interact(|conn| -> Result<(), rusqlite::Error> {
56 conn.execute("PRAGMA journal_mode = WAL", [])?;
58 conn.execute("PRAGMA synchronous = NORMAL", [])?;
59 conn.execute("PRAGMA cache_size = -64000", [])?; conn.execute("PRAGMA foreign_keys = ON", [])?;
61 conn.execute("PRAGMA temp_store = MEMORY", [])?;
62 conn.execute("PRAGMA mmap_size = 268435456", [])?; conn.execute(
66 "CREATE TABLE IF NOT EXISTS messages (
67 id TEXT PRIMARY KEY NOT NULL,
68 channel_id TEXT NOT NULL,
69 sender TEXT NOT NULL,
70 content TEXT NOT NULL,
71 thread_id TEXT,
72 reply_to TEXT,
73 created_at INTEGER NOT NULL,
74 edited_at INTEGER,
75 deleted_at INTEGER,
76 ephemeral INTEGER DEFAULT 0,
77 signature TEXT NOT NULL DEFAULT ''
78 )",
79 [],
80 )?;
81
82 conn.execute(
84 "CREATE TABLE IF NOT EXISTS attachments (
85 id TEXT PRIMARY KEY NOT NULL,
86 message_id TEXT NOT NULL,
87 filename TEXT NOT NULL,
88 mime_type TEXT NOT NULL,
89 size_bytes INTEGER NOT NULL,
90 dht_hash TEXT NOT NULL,
91 thumbnail BLOB,
92 metadata TEXT DEFAULT '{}',
93 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
94 )",
95 [],
96 )?;
97
98 conn.execute(
100 "CREATE TABLE IF NOT EXISTS reactions (
101 id INTEGER PRIMARY KEY AUTOINCREMENT,
102 message_id TEXT NOT NULL,
103 emoji TEXT NOT NULL,
104 user_id TEXT NOT NULL,
105 created_at INTEGER NOT NULL,
106 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE,
107 UNIQUE(message_id, emoji, user_id)
108 )",
109 [],
110 )?;
111
112 conn.execute(
114 "CREATE TABLE IF NOT EXISTS mentions (
115 id INTEGER PRIMARY KEY AUTOINCREMENT,
116 message_id TEXT NOT NULL,
117 user TEXT NOT NULL,
118 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
119 )",
120 [],
121 )?;
122
123 conn.execute(
125 "CREATE TABLE IF NOT EXISTS read_receipts (
126 message_id TEXT NOT NULL,
127 user_id TEXT NOT NULL,
128 read_at INTEGER NOT NULL,
129 PRIMARY KEY (message_id, user_id),
130 FOREIGN KEY (message_id) REFERENCES messages(id) ON DELETE CASCADE
131 )",
132 [],
133 )?;
134
135 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_channel ON messages(channel_id, created_at DESC)", [])?;
137 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_thread ON messages(thread_id, created_at)", [])?;
138 conn.execute("CREATE INDEX IF NOT EXISTS idx_messages_sender ON messages(sender)", [])?;
139 conn.execute("CREATE INDEX IF NOT EXISTS idx_attachments_message ON attachments(message_id)", [])?;
140 conn.execute("CREATE INDEX IF NOT EXISTS idx_reactions_message ON reactions(message_id)", [])?;
141 conn.execute("CREATE INDEX IF NOT EXISTS idx_mentions_user ON mentions(user)", [])?;
142
143 Ok(())
144 }).await;
145
146 match result {
147 Ok(Ok(())) => {
148 info!("Database schema initialized successfully");
149 Ok(())
150 }
151 Ok(Err(e)) => Err(anyhow::anyhow!("Database schema creation failed: {}", e)),
152 Err(e) => Err(anyhow::anyhow!(
153 "Failed to execute schema initialization: {}",
154 e
155 )),
156 }
157 }
158
159 pub async fn store_message(&self, message: &RichMessage) -> Result<()> {
161 let conn = self.pool.get().await?;
162
163 let content_json = serde_json::to_string(&message.content)?;
165 let message_clone = message.clone();
166
167 let result = conn
168 .interact(move |conn| -> Result<(), rusqlite::Error> {
169 let tx = conn.transaction()?;
170
171 tx.execute(
173 "INSERT OR REPLACE INTO messages (
174 id, channel_id, sender, content, thread_id, reply_to,
175 created_at, edited_at, deleted_at, ephemeral, signature
176 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
177 params![
178 message_clone.id.to_string(),
179 message_clone.channel_id.to_string(),
180 message_clone.sender.to_string(),
181 content_json,
182 message_clone.thread_id.as_ref().map(|id| id.to_string()),
183 message_clone.reply_to.as_ref().map(|id| id.to_string()),
184 message_clone.created_at.timestamp_millis(),
185 message_clone
186 .edited_at
187 .as_ref()
188 .map(|dt| dt.timestamp_millis()),
189 message_clone
190 .deleted_at
191 .as_ref()
192 .map(|dt| dt.timestamp_millis()),
193 message_clone.ephemeral as i32,
194 hex::encode(&message_clone.signature.signature)
195 ],
196 )?;
197
198 for attachment in &message_clone.attachments {
200 let metadata_json =
201 serde_json::to_string(&attachment.metadata).unwrap_or_default();
202
203 tx.execute(
204 "INSERT OR REPLACE INTO attachments (
205 id, message_id, filename, mime_type, size_bytes,
206 dht_hash, thumbnail, metadata
207 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
208 params![
209 attachment.id,
210 message_clone.id.to_string(),
211 attachment.filename,
212 attachment.mime_type,
213 attachment.size_bytes,
214 attachment.dht_hash,
215 attachment.thumbnail.as_ref(),
216 metadata_json
217 ],
218 )?;
219 }
220
221 for mention in &message_clone.mentions {
223 tx.execute(
224 "INSERT OR IGNORE INTO mentions (message_id, user) VALUES (?1, ?2)",
225 params![message_clone.id.to_string(), mention.to_string()],
226 )?;
227 }
228
229 for (emoji, users) in &message_clone.reactions {
231 for user in users {
232 tx.execute(
233 "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at)
234 VALUES (?1, ?2, ?3, ?4)",
235 params![
236 message_clone.id.to_string(),
237 emoji,
238 user.to_string(),
239 chrono::Utc::now().timestamp_millis()
240 ],
241 )?;
242 }
243 }
244
245 tx.commit()?;
246 Ok(())
247 })
248 .await;
249
250 match result {
251 Ok(Ok(())) => {
252 debug!("Message stored successfully: {}", message.id);
253 Ok(())
254 }
255 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to store message: {}", e)),
256 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
257 }
258 }
259
260 pub async fn get_message(&self, id: MessageId) -> Result<RichMessage> {
262 let conn = self.pool.get().await?;
263 let message_id = id.to_string();
264
265 let result = conn
266 .interact(move |conn| -> Result<RichMessage, rusqlite::Error> {
267 let mut stmt = conn.prepare(
268 "SELECT id, channel_id, sender, content, thread_id, reply_to,
269 created_at, edited_at, deleted_at, ephemeral, signature
270 FROM messages WHERE id = ?1",
271 )?;
272
273 let row = stmt.query_row(params![message_id], |row| {
274 let content_json: String = row.get("content")?;
275 let content: MessageContent =
276 serde_json::from_str(&content_json).map_err(|_| {
277 rusqlite::Error::InvalidColumnType(
278 0,
279 "content".to_string(),
280 rusqlite::types::Type::Text,
281 )
282 })?;
283
284 let created_at = Utc
285 .timestamp_millis_opt(row.get("created_at")?)
286 .single()
287 .ok_or(rusqlite::Error::InvalidColumnType(
288 0,
289 "created_at".to_string(),
290 rusqlite::types::Type::Integer,
291 ))?;
292
293 let edited_at: Option<i64> = row.get("edited_at")?;
294 let edited_at = edited_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
295
296 let deleted_at: Option<i64> = row.get("deleted_at")?;
297 let deleted_at =
298 deleted_at.and_then(|ts| Utc.timestamp_millis_opt(ts).single());
299
300 let thread_id: Option<String> = row.get("thread_id")?;
301 let thread_id = thread_id.and_then(|s| Uuid::parse_str(&s).ok().map(ThreadId));
302
303 let reply_to: Option<String> = row.get("reply_to")?;
304 let reply_to = reply_to.and_then(|s| Uuid::parse_str(&s).ok().map(MessageId));
305
306 let signature_hex: String = row.get("signature")?;
307 let signature_bytes = hex::decode(&signature_hex).unwrap_or_default();
308
309 Ok(RichMessage {
310 id: MessageId(Uuid::parse_str(&row.get::<_, String>("id")?).map_err(
311 |_| {
312 rusqlite::Error::InvalidColumnType(
313 0,
314 "id".to_string(),
315 rusqlite::types::Type::Text,
316 )
317 },
318 )?),
319 channel_id: ChannelId(
320 Uuid::parse_str(&row.get::<_, String>("channel_id")?).map_err(
321 |_| {
322 rusqlite::Error::InvalidColumnType(
323 0,
324 "channel_id".to_string(),
325 rusqlite::types::Type::Text,
326 )
327 },
328 )?,
329 ),
330 sender: UserHandle::from(row.get::<_, String>("sender")?),
331 content,
332 thread_id,
333 reply_to,
334 created_at,
335 edited_at,
336 deleted_at,
337 ephemeral: row.get::<_, i32>("ephemeral")? != 0,
338 attachments: Vec::new(), mentions: Vec::new(), reactions: HashMap::new(), read_by: HashMap::new(),
342 delivered_to: HashMap::new(),
343 expires_at: None,
344 thread_count: 0,
345 last_thread_reply: None,
346 sender_device: crate::messaging::types::DeviceId("primary".to_string()),
347 encryption: EncryptionMethod::E2E,
348 signature: MessageSignature {
349 algorithm: "ed25519".to_string(),
350 signature: signature_bytes,
351 },
352 })
353 })?;
354
355 Ok(row)
356 })
357 .await;
358
359 match result {
360 Ok(Ok(mut message)) => {
361 message.attachments = self.get_attachments(message.id).await?;
363 message.mentions = self.get_mentions(message.id).await?;
364 message.reactions = self.get_reactions(message.id).await?;
365 Ok(message)
366 }
367 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to retrieve message: {}", e)),
368 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
369 }
370 }
371
372 async fn get_attachments(&self, message_id: MessageId) -> Result<Vec<Attachment>> {
374 let conn = self.pool.get().await?;
375 let msg_id = message_id.to_string();
376
377 let result = conn
378 .interact(move |conn| -> Result<Vec<Attachment>, rusqlite::Error> {
379 let mut stmt = conn.prepare(
380 "SELECT id, filename, mime_type, size_bytes, dht_hash, thumbnail, metadata
381 FROM attachments WHERE message_id = ?1",
382 )?;
383
384 let rows = stmt.query_map(params![msg_id], |row| {
385 let metadata_json: String = row.get("metadata")?;
386 let metadata: HashMap<String, String> =
387 serde_json::from_str(&metadata_json).unwrap_or_default();
388
389 let thumbnail: Option<Vec<u8>> = row.get("thumbnail")?;
390
391 Ok(Attachment {
392 id: row.get("id")?,
393 filename: row.get("filename")?,
394 mime_type: row.get("mime_type")?,
395 size_bytes: row.get("size_bytes")?,
396 dht_hash: row.get("dht_hash")?,
397 thumbnail,
398 metadata,
399 encryption_key: None, })
401 })?;
402
403 let mut attachments = Vec::new();
404 for row in rows {
405 attachments.push(row?);
406 }
407 Ok(attachments)
408 })
409 .await;
410
411 match result {
412 Ok(Ok(attachments)) => Ok(attachments),
413 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get attachments: {}", e)),
414 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
415 }
416 }
417
418 async fn get_mentions(&self, message_id: MessageId) -> Result<Vec<crate::messaging::user_handle::UserHandle>> {
420 let conn = self.pool.get().await?;
421 let msg_id = message_id.to_string();
422
423 let result = conn
424 .interact(move |conn| -> Result<Vec<String>, rusqlite::Error> {
425 let mut stmt = conn.prepare("SELECT user FROM mentions WHERE message_id = ?1")?;
426 let rows = stmt.query_map(params![msg_id], |row| row.get::<_, String>("user"))?;
427
428 let mut mentions = Vec::new();
429 for row in rows {
430 mentions.push(row?);
431 }
432 Ok(mentions)
433 })
434 .await;
435
436 match result {
437 Ok(Ok(user_strings)) => Ok(
438 user_strings
439 .into_iter()
440 .map(crate::messaging::user_handle::UserHandle::from)
441 .collect(),
442 ),
443 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get mentions: {}", e)),
444 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
445 }
446 }
447
448 async fn get_reactions(
450 &self,
451 message_id: MessageId,
452 ) -> Result<HashMap<String, Vec<crate::messaging::user_handle::UserHandle>>> {
453 let conn = self.pool.get().await?;
454 let msg_id = message_id.to_string();
455
456 let result = conn
457 .interact(
458 move |conn| -> Result<HashMap<String, Vec<String>>, rusqlite::Error> {
459 let mut stmt = conn.prepare(
460 "SELECT emoji, user_id FROM reactions WHERE message_id = ?1 ORDER BY created_at"
461 )?;
462 let rows = stmt.query_map(params![msg_id], |row| {
463 Ok((
464 row.get::<_, String>("emoji")?,
465 row.get::<_, String>("user_id")?,
466 ))
467 })?;
468
469 let mut reactions: HashMap<String, Vec<String>> = HashMap::new();
470 for row in rows {
471 let (emoji, user_id) = row?;
472 reactions.entry(emoji).or_default().push(user_id);
473 }
474 Ok(reactions)
475 },
476 )
477 .await;
478
479 match result {
480 Ok(Ok(reaction_strings)) => Ok(
481 reaction_strings
482 .into_iter()
483 .map(|(emoji, users)| {
484 let handles = users
485 .into_iter()
486 .map(crate::messaging::user_handle::UserHandle::from)
487 .collect();
488 (emoji, handles)
489 })
490 .collect(),
491 ),
492 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get reactions: {}", e)),
493 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
494 }
495 }
496
497 pub async fn update_message(&self, message: &RichMessage) -> Result<()> {
499 self.store_message(message).await
502 }
503
504 pub async fn get_channel_messages(
506 &self,
507 channel_id: ChannelId,
508 limit: usize,
509 before: Option<DateTime<Utc>>,
510 ) -> Result<Vec<RichMessage>> {
511 let conn = self.pool.get().await?;
512 let chan_id = channel_id.to_string();
513 let before_ts = before.map(|dt| dt.timestamp_millis()).unwrap_or(i64::MAX);
514
515 let result = conn
516 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
517 let mut stmt = conn.prepare(
518 "SELECT id FROM messages
519 WHERE channel_id = ?1 AND created_at < ?2 AND deleted_at IS NULL
520 ORDER BY created_at DESC LIMIT ?3",
521 )?;
522
523 let rows = stmt.query_map(params![chan_id, before_ts, limit as i64], |row| {
524 let id_str: String = row.get("id")?;
525 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
526 rusqlite::Error::InvalidColumnType(
527 0,
528 "id".to_string(),
529 rusqlite::types::Type::Text,
530 )
531 })
532 })?;
533
534 let mut message_ids = Vec::new();
535 for row in rows {
536 message_ids.push(row?);
537 }
538 Ok(message_ids)
539 })
540 .await;
541
542 match result {
543 Ok(Ok(message_ids)) => {
544 let mut messages = Vec::new();
545 for id in message_ids {
546 if let Ok(msg) = self.get_message(id).await {
547 messages.push(msg);
548 }
549 }
550 Ok(messages)
551 }
552 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get channel messages: {}", e)),
553 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
554 }
555 }
556
557 pub async fn mark_as_read(&self, message_id: MessageId, user: crate::messaging::user_handle::UserHandle) -> Result<()> {
559 let conn = self.pool.get().await?;
560 let msg_id = message_id.to_string();
561 let user_str = user.as_str().to_string();
562
563 let result = conn
564 .interact(move |conn| -> Result<(), rusqlite::Error> {
565 conn.execute(
566 "INSERT OR REPLACE INTO read_receipts (message_id, user_id, read_at)
567 VALUES (?1, ?2, ?3)",
568 params![msg_id, user_str, chrono::Utc::now().timestamp_millis()],
569 )?;
570 Ok(())
571 })
572 .await;
573
574 match result {
575 Ok(Ok(())) => {
576 debug!("Message {} marked as read by {}", message_id, user);
577 Ok(())
578 }
579 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to mark message as read: {}", e)),
580 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
581 }
582 }
583
584 pub async fn search_messages(
586 &self,
587 query: &str,
588 channel_id: Option<ChannelId>,
589 limit: usize,
590 ) -> Result<Vec<RichMessage>> {
591 let conn = self.pool.get().await?;
592 let search_query = format!("%{}%", query);
593 let chan_filter = channel_id.map(|id| id.to_string());
594
595 let result = conn
596 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
597 let (sql, params): (String, Vec<Box<dyn rusqlite::ToSql + Send>>) =
598 if let Some(channel) = chan_filter {
599 (
600 "SELECT id FROM messages
601 WHERE content LIKE ?1 AND channel_id = ?2 AND deleted_at IS NULL
602 ORDER BY created_at DESC LIMIT ?3"
603 .to_string(),
604 vec![
605 Box::new(search_query),
606 Box::new(channel),
607 Box::new(limit as i64),
608 ],
609 )
610 } else {
611 (
612 "SELECT id FROM messages
613 WHERE content LIKE ?1 AND deleted_at IS NULL
614 ORDER BY created_at DESC LIMIT ?2"
615 .to_string(),
616 vec![Box::new(search_query), Box::new(limit as i64)],
617 )
618 };
619
620 let mut stmt = conn.prepare(&sql)?;
621 let param_refs: Vec<&dyn rusqlite::ToSql> = params
622 .iter()
623 .map(|p| p.as_ref() as &dyn rusqlite::ToSql)
624 .collect();
625
626 let rows = stmt.query_map(param_refs.as_slice(), |row| {
627 let id_str: String = row.get("id")?;
628 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
629 rusqlite::Error::InvalidColumnType(
630 0,
631 "id".to_string(),
632 rusqlite::types::Type::Text,
633 )
634 })
635 })?;
636
637 let mut message_ids = Vec::new();
638 for row in rows {
639 message_ids.push(row?);
640 }
641 Ok(message_ids)
642 })
643 .await;
644
645 match result {
646 Ok(Ok(message_ids)) => {
647 let mut messages = Vec::new();
648 for id in message_ids {
649 if let Ok(msg) = self.get_message(id).await {
650 messages.push(msg);
651 }
652 }
653 Ok(messages)
654 }
655 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to search messages: {}", e)),
656 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
657 }
658 }
659
660 pub async fn add_reaction(
662 &self,
663 message_id: MessageId,
664 emoji: String,
665 user: crate::messaging::user_handle::UserHandle,
666 ) -> Result<()> {
667 let conn = self.pool.get().await?;
668 let msg_id = message_id.to_string();
669 let user_str = user.as_str().to_string();
670 let emoji_clone = emoji.clone();
671
672 let result = conn
673 .interact(move |conn| -> Result<(), rusqlite::Error> {
674 conn.execute(
675 "INSERT OR IGNORE INTO reactions (message_id, emoji, user_id, created_at)
676 VALUES (?1, ?2, ?3, ?4)",
677 params![
678 msg_id,
679 emoji_clone,
680 user_str,
681 chrono::Utc::now().timestamp_millis()
682 ],
683 )?;
684 Ok(())
685 })
686 .await;
687
688 match result {
689 Ok(Ok(())) => {
690 debug!(
691 "Reaction {} added to message {} by {}",
692 emoji, message_id, user
693 );
694 Ok(())
695 }
696 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to add reaction: {}", e)),
697 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
698 }
699 }
700
701 pub async fn remove_reaction(
703 &self,
704 message_id: MessageId,
705 emoji: String,
706 user: crate::messaging::user_handle::UserHandle,
707 ) -> Result<()> {
708 let conn = self.pool.get().await?;
709 let msg_id = message_id.to_string();
710 let user_str = user.as_str().to_string();
711 let emoji_clone = emoji.clone();
712
713 let result = conn
714 .interact(move |conn| -> Result<(), rusqlite::Error> {
715 conn.execute(
716 "DELETE FROM reactions
717 WHERE message_id = ?1 AND emoji = ?2 AND user_id = ?3",
718 params![msg_id, emoji_clone, user_str],
719 )?;
720 Ok(())
721 })
722 .await;
723
724 match result {
725 Ok(Ok(())) => {
726 debug!(
727 "Reaction {} removed from message {} by {}",
728 emoji, message_id, user
729 );
730 Ok(())
731 }
732 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to remove reaction: {}", e)),
733 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
734 }
735 }
736
737 pub async fn get_thread_messages(&self, thread_id: ThreadId) -> Result<Vec<RichMessage>> {
739 let conn = self.pool.get().await?;
740 let thread_str = thread_id.to_string();
741
742 let result = conn
743 .interact(move |conn| -> Result<Vec<MessageId>, rusqlite::Error> {
744 let mut stmt = conn.prepare(
745 "SELECT id FROM messages
746 WHERE thread_id = ?1 AND deleted_at IS NULL
747 ORDER BY created_at ASC",
748 )?;
749
750 let rows = stmt.query_map(params![thread_str], |row| {
751 let id_str: String = row.get("id")?;
752 Uuid::parse_str(&id_str).map(MessageId).map_err(|_| {
753 rusqlite::Error::InvalidColumnType(
754 0,
755 "id".to_string(),
756 rusqlite::types::Type::Text,
757 )
758 })
759 })?;
760
761 let mut message_ids = Vec::new();
762 for row in rows {
763 message_ids.push(row?);
764 }
765 Ok(message_ids)
766 })
767 .await;
768
769 match result {
770 Ok(Ok(message_ids)) => {
771 let mut messages = Vec::new();
772 for id in message_ids {
773 if let Ok(msg) = self.get_message(id).await {
774 messages.push(msg);
775 }
776 }
777 Ok(messages)
778 }
779 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get thread messages: {}", e)),
780 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
781 }
782 }
783
784 pub async fn get_stats(&self) -> Result<DatabaseStats> {
786 let conn = self.pool.get().await?;
787
788 let result = conn
789 .interact(|conn| -> Result<DatabaseStats, rusqlite::Error> {
790 let mut total_messages: i64 = 0;
791 let mut total_attachments: i64 = 0;
792 let mut total_reactions: i64 = 0;
793 conn.query_row("SELECT COUNT(*) FROM messages", [], |row| {
794 total_messages = row.get(0)?;
795 Ok(())
796 })?;
797
798 conn.query_row("SELECT COUNT(*) FROM attachments", [], |row| {
799 total_attachments = row.get(0)?;
800 Ok(())
801 })?;
802
803 conn.query_row("SELECT COUNT(*) FROM reactions", [], |row| {
804 total_reactions = row.get(0)?;
805 Ok(())
806 })?;
807
808 let page_count: i64 = conn.query_row("PRAGMA page_count", [], |row| row.get(0))?;
810 let page_size: i64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
811 let db_size = page_count * page_size;
812
813 Ok(DatabaseStats {
814 total_messages: total_messages as u64,
815 total_attachments: total_attachments as u64,
816 total_reactions: total_reactions as u64,
817 database_size_bytes: db_size as u64,
818 })
819 })
820 .await;
821
822 match result {
823 Ok(Ok(stats)) => Ok(stats),
824 Ok(Err(e)) => Err(anyhow::anyhow!("Failed to get database stats: {}", e)),
825 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
826 }
827 }
828
829 pub async fn cleanup_ephemeral(&self, ttl_seconds: i64) -> Result<usize> {
831 let conn = self.pool.get().await?;
832 let cutoff_time = chrono::Utc::now().timestamp_millis() - (ttl_seconds * 1000);
833
834 let result = conn
835 .interact(move |conn| -> Result<usize, rusqlite::Error> {
836 let changes = conn.execute(
837 "DELETE FROM messages
838 WHERE ephemeral = 1 AND created_at < ?1",
839 params![cutoff_time],
840 )?;
841 Ok(changes)
842 })
843 .await;
844
845 match result {
846 Ok(Ok(count)) => {
847 info!("Cleaned up {} ephemeral messages", count);
848 Ok(count)
849 }
850 Ok(Err(e)) => Err(anyhow::anyhow!(
851 "Failed to cleanup ephemeral messages: {}",
852 e
853 )),
854 Err(e) => Err(anyhow::anyhow!("Database interaction failed: {}", e)),
855 }
856 }
857}
858
859#[derive(Debug, Clone)]
861pub struct DatabaseStats {
862 pub total_messages: u64,
863 pub total_attachments: u64,
864 pub total_reactions: u64,
865 pub database_size_bytes: u64,
866}
867
868pub type MessageStore = DatabaseMessageStore;