use std::path::PathBuf;
use std::sync::Mutex;
use anyhow::{anyhow, Context, Result};
use pim_core::NodeId;
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use crate::hex_node_id;
const SCHEMA: &str = include_str!("schema.sql");
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum MessageDirection {
Sent,
Received,
}
impl MessageDirection {
fn as_str(&self) -> &'static str {
match self {
MessageDirection::Sent => "sent",
MessageDirection::Received => "received",
}
}
fn from_str(s: &str) -> Result<Self> {
match s {
"sent" => Ok(Self::Sent),
"received" => Ok(Self::Received),
other => Err(anyhow!("unknown direction: {other}")),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum MessageStatus {
Pending,
Sent,
Delivered,
Read,
Failed,
}
impl MessageStatus {
fn as_str(&self) -> &'static str {
match self {
MessageStatus::Pending => "pending",
MessageStatus::Sent => "sent",
MessageStatus::Delivered => "delivered",
MessageStatus::Read => "read",
MessageStatus::Failed => "failed",
}
}
fn from_str(s: &str) -> Result<Self> {
match s {
"pending" => Ok(Self::Pending),
"sent" => Ok(Self::Sent),
"delivered" => Ok(Self::Delivered),
"read" => Ok(Self::Read),
"failed" => Ok(Self::Failed),
other => Err(anyhow!("unknown status: {other}")),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AckKind {
Delivered = 1,
Read = 2,
}
impl AckKind {
pub fn from_u8(v: u8) -> Option<Self> {
match v {
1 => Some(Self::Delivered),
2 => Some(Self::Read),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageRecord {
pub id: String,
pub peer_node_id: String,
pub direction: MessageDirection,
pub body: String,
pub timestamp_ms: i64,
pub status: MessageStatus,
pub failure_reason: Option<String>,
pub delivered_at_ms: Option<i64>,
pub read_at_ms: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConversationSummary {
pub peer_node_id: String,
pub peer_node_id_short: String,
pub name: String,
pub last_message_preview: Option<String>,
pub last_message_ts_ms: Option<i64>,
pub unread_count: i64,
pub x25519_pubkey: Option<String>,
}
pub struct MessagingStorage {
conn: Mutex<Connection>,
}
impl MessagingStorage {
pub fn open(path: PathBuf) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create messages dir {}", parent.display()))?;
}
let conn = Connection::open(&path)
.with_context(|| format!("open messages db at {}", path.display()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Ok(meta) = std::fs::metadata(&path) {
let mut perms = meta.permissions();
perms.set_mode(0o600);
let _ = std::fs::set_permissions(&path, perms);
}
}
conn.execute_batch(SCHEMA)
.context("apply messages schema")?;
Ok(Self {
conn: Mutex::new(conn),
})
}
pub fn insert_message(&self, m: &MessageRecord) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT OR REPLACE INTO messages \
(id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
m.id,
m.peer_node_id,
m.direction.as_str(),
m.body,
m.timestamp_ms,
m.status.as_str(),
m.failure_reason,
m.delivered_at_ms,
m.read_at_ms,
],
)?;
Ok(())
}
pub fn bump_conversation_after_local_send(
&self,
peer_id_hex: &str,
message_id_hex: &str,
ts_ms: i64,
body: &str,
) -> Result<()> {
let preview = preview_of(body);
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO conversations_meta (peer_node_id, unread_count, last_read_message_id, \
last_message_id, last_message_preview, last_message_ts_ms) \
VALUES (?1, 0, NULL, ?2, ?3, ?4) \
ON CONFLICT(peer_node_id) DO UPDATE SET \
last_message_id = excluded.last_message_id, \
last_message_preview = excluded.last_message_preview, \
last_message_ts_ms = excluded.last_message_ts_ms",
params![peer_id_hex, message_id_hex, preview, ts_ms],
)?;
Ok(())
}
pub fn bump_conversation_after_remote_receive(
&self,
peer_id_hex: &str,
message_id_hex: &str,
ts_ms: i64,
body: &str,
) -> Result<ConversationBump> {
let preview = preview_of(body);
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO conversations_meta (peer_node_id, unread_count, last_message_id, \
last_message_preview, last_message_ts_ms) \
VALUES (?1, 1, ?2, ?3, ?4) \
ON CONFLICT(peer_node_id) DO UPDATE SET \
unread_count = unread_count + 1, \
last_message_id = excluded.last_message_id, \
last_message_preview = excluded.last_message_preview, \
last_message_ts_ms = excluded.last_message_ts_ms",
params![peer_id_hex, message_id_hex, preview, ts_ms],
)?;
let unread_count: i64 = conn.query_row(
"SELECT unread_count FROM conversations_meta WHERE peer_node_id = ?1",
params![peer_id_hex],
|row| row.get::<_, i64>(0),
)?;
Ok(ConversationBump {
preview,
unread_count,
})
}
pub fn set_message_status(
&self,
message_id_hex: &str,
status: MessageStatus,
delivered_at_ms: Option<i64>,
read_at_ms: Option<i64>,
) -> Result<()> {
let conn = self.conn.lock().unwrap();
match (delivered_at_ms, read_at_ms) {
(Some(d), Some(r)) => {
conn.execute(
"UPDATE messages SET status = ?1, delivered_at_ms = ?2, read_at_ms = ?3 WHERE id = ?4",
params![status.as_str(), d, r, message_id_hex],
)?;
}
(Some(d), None) => {
conn.execute(
"UPDATE messages SET status = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
params![status.as_str(), d, message_id_hex],
)?;
}
(None, Some(r)) => {
conn.execute(
"UPDATE messages SET status = ?1, read_at_ms = COALESCE(read_at_ms, ?2) WHERE id = ?3",
params![status.as_str(), r, message_id_hex],
)?;
}
(None, None) => {
conn.execute(
"UPDATE messages SET status = ?1 WHERE id = ?2",
params![status.as_str(), message_id_hex],
)?;
}
}
Ok(())
}
pub fn set_message_failed(&self, message_id_hex: &str, reason: &str, at_ms: i64) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute(
"UPDATE messages SET status = 'failed', failure_reason = ?1, delivered_at_ms = COALESCE(delivered_at_ms, ?2) WHERE id = ?3",
params![reason, at_ms, message_id_hex],
)?;
Ok(())
}
pub fn history(
&self,
peer_id_hex: &str,
before_ts_ms: Option<i64>,
limit: i64,
) -> Result<(Vec<MessageRecord>, bool)> {
let conn = self.conn.lock().unwrap();
let limit_plus = limit.saturating_add(1).max(2);
let mut stmt = match before_ts_ms {
Some(_) => conn.prepare(
"SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
FROM messages \
WHERE peer_node_id = ?1 AND timestamp_ms < ?2 \
ORDER BY timestamp_ms DESC, id DESC \
LIMIT ?3",
)?,
None => conn.prepare(
"SELECT id, peer_node_id, direction, body, timestamp_ms, status, failure_reason, delivered_at_ms, read_at_ms \
FROM messages \
WHERE peer_node_id = ?1 \
ORDER BY timestamp_ms DESC, id DESC \
LIMIT ?2",
)?,
};
let rows = match before_ts_ms {
Some(ts) => stmt.query_map(params![peer_id_hex, ts, limit_plus], parse_row)?,
None => stmt.query_map(params![peer_id_hex, limit_plus], parse_row)?,
};
let mut out: Vec<MessageRecord> = rows.collect::<Result<Vec<_>, _>>()?;
let has_more = out.len() as i64 > limit;
if has_more {
out.truncate(limit as usize);
}
Ok((out, has_more))
}
pub fn list_conversations_raw(&self) -> Result<Vec<ConversationSummary>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT peer_node_id, last_message_preview, last_message_ts_ms, unread_count \
FROM conversations_meta \
ORDER BY last_message_ts_ms DESC NULLS LAST",
)?;
let rows = stmt
.query_map([], |row| {
let peer_node_id: String = row.get(0)?;
let preview: Option<String> = row.get(1)?;
let ts: Option<i64> = row.get(2)?;
let unread: i64 = row.get(3)?;
let short = short_id(&peer_node_id);
Ok(ConversationSummary {
peer_node_id_short: short.clone(),
name: short,
peer_node_id,
last_message_preview: preview,
last_message_ts_ms: ts,
unread_count: unread,
x25519_pubkey: None,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn delete_conversation(&self, peer_id_hex: &str) -> Result<(usize, bool)> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction()?;
let deleted_messages = tx.execute(
"DELETE FROM messages WHERE peer_node_id = ?1",
params![peer_id_hex],
)?;
let deleted_meta = tx.execute(
"DELETE FROM conversations_meta WHERE peer_node_id = ?1",
params![peer_id_hex],
)?;
tx.commit()?;
Ok((deleted_messages, deleted_meta > 0))
}
pub fn delete_all_messages(&self) -> Result<(usize, usize)> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction()?;
let deleted_messages = tx.execute("DELETE FROM messages", [])?;
let deleted_meta = tx.execute("DELETE FROM conversations_meta", [])?;
tx.commit()?;
Ok((deleted_messages, deleted_meta))
}
pub fn mark_read_up_to(&self, peer_id_hex: &str, up_to_ts_ms: i64) -> Result<i64> {
let conn = self.conn.lock().unwrap();
conn.execute(
"UPDATE messages SET status = 'read', read_at_ms = COALESCE(read_at_ms, ?2) \
WHERE peer_node_id = ?1 AND direction = 'received' AND timestamp_ms <= ?2 AND status != 'read'",
params![peer_id_hex, up_to_ts_ms],
)?;
conn.execute(
"UPDATE conversations_meta SET unread_count = 0 WHERE peer_node_id = ?1",
params![peer_id_hex],
)?;
Ok(0)
}
pub fn delete_conversation_for_peer(&self, peer: &NodeId) -> Result<(usize, bool)> {
let hex = hex_node_id(peer);
self.delete_conversation(&hex)
}
}
#[derive(Debug, Clone)]
pub struct ConversationBump {
pub preview: String,
pub unread_count: i64,
}
fn parse_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<MessageRecord> {
let direction_str: String = row.get(2)?;
let status_str: String = row.get(5)?;
Ok(MessageRecord {
id: row.get(0)?,
peer_node_id: row.get(1)?,
direction: MessageDirection::from_str(&direction_str)
.map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
body: row.get(3)?,
timestamp_ms: row.get(4)?,
status: MessageStatus::from_str(&status_str)
.map_err(|e| rusqlite::Error::InvalidColumnName(e.to_string()))?,
failure_reason: row.get(6)?,
delivered_at_ms: row.get(7)?,
read_at_ms: row.get(8)?,
})
}
fn preview_of(body: &str) -> String {
let truncated: String = body.chars().take(80).collect();
truncated.replace('\n', " ")
}
pub(crate) fn short_id(hex: &str) -> String {
hex.chars().take(8).collect()
}