use super::{Store, CONVERSATION_TIMEOUT_MINUTES};
use crate::error::MemoryError;
use uuid::Uuid;
impl Store {
pub(crate) async fn get_or_create_conversation(
&self,
channel: &str,
sender_id: &str,
project: &str,
) -> Result<String, MemoryError> {
let row: Option<(String,)> = sqlx::query_as(
"SELECT id FROM conversations \
WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active' \
AND datetime(last_activity) > datetime('now', ? || ' minutes') \
ORDER BY last_activity DESC LIMIT 1",
)
.bind(channel)
.bind(sender_id)
.bind(project)
.bind(-CONVERSATION_TIMEOUT_MINUTES)
.fetch_optional(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
if let Some((id,)) = row {
sqlx::query(
"UPDATE conversations SET last_activity = datetime('now'), updated_at = datetime('now') WHERE id = ?",
)
.bind(&id)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("update failed", e))?;
return Ok(id);
}
let id = Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO conversations (id, channel, sender_id, project, status, last_activity) \
VALUES (?, ?, ?, ?, 'active', datetime('now'))",
)
.bind(&id)
.bind(channel)
.bind(sender_id)
.bind(project)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("insert failed", e))?;
Ok(id)
}
pub async fn find_idle_conversations(
&self,
) -> Result<Vec<(String, String, String, String)>, MemoryError> {
let rows: Vec<(String, String, String, String)> = sqlx::query_as(
"SELECT id, channel, sender_id, project FROM conversations \
WHERE status = 'active' \
AND datetime(last_activity) <= datetime('now', ? || ' minutes')",
)
.bind(-CONVERSATION_TIMEOUT_MINUTES)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok(rows)
}
pub async fn find_all_active_conversations(
&self,
) -> Result<Vec<(String, String, String, String)>, MemoryError> {
let rows: Vec<(String, String, String, String)> = sqlx::query_as(
"SELECT id, channel, sender_id, project FROM conversations WHERE status = 'active'",
)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok(rows)
}
pub async fn get_conversation_messages(
&self,
conversation_id: &str,
) -> Result<Vec<(String, String)>, MemoryError> {
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT role, content FROM messages \
WHERE conversation_id = ? ORDER BY timestamp ASC",
)
.bind(conversation_id)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok(rows)
}
pub async fn close_conversation(
&self,
conversation_id: &str,
summary: &str,
) -> Result<(), MemoryError> {
sqlx::query(
"UPDATE conversations SET status = 'closed', summary = ?, updated_at = datetime('now') WHERE id = ?",
)
.bind(summary)
.bind(conversation_id)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("update failed", e))?;
Ok(())
}
pub async fn close_current_conversation(
&self,
channel: &str,
sender_id: &str,
project: &str,
) -> Result<bool, MemoryError> {
let result = sqlx::query(
"UPDATE conversations SET status = 'closed', updated_at = datetime('now') \
WHERE channel = ? AND sender_id = ? AND project = ? AND status = 'active'",
)
.bind(channel)
.bind(sender_id)
.bind(project)
.execute(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("update failed", e))?;
Ok(result.rows_affected() > 0)
}
pub async fn get_recent_summaries(
&self,
channel: &str,
sender_id: &str,
limit: i64,
) -> Result<Vec<(String, String)>, MemoryError> {
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT summary, updated_at FROM conversations \
WHERE channel = ? AND sender_id = ? AND status = 'closed' AND summary IS NOT NULL \
ORDER BY updated_at DESC LIMIT ?",
)
.bind(channel)
.bind(sender_id)
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok(rows)
}
pub async fn get_all_recent_summaries(
&self,
limit: i64,
) -> Result<Vec<(String, String)>, MemoryError> {
let rows: Vec<(String, String)> = sqlx::query_as(
"SELECT summary, updated_at FROM conversations \
WHERE status = 'closed' AND summary IS NOT NULL \
ORDER BY updated_at DESC LIMIT ?",
)
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok(rows)
}
pub async fn get_history(
&self,
channel: &str,
sender_id: &str,
limit: i64,
) -> Result<Vec<crate::types::HistoryRow>, MemoryError> {
let rows: Vec<(String, String, String)> = sqlx::query_as(
"SELECT id, COALESCE(summary, '(no summary)'), updated_at FROM conversations \
WHERE channel = ? AND sender_id = ? AND status = 'closed' \
ORDER BY updated_at DESC LIMIT ?",
)
.bind(channel)
.bind(sender_id)
.bind(limit)
.fetch_all(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
let mut out = Vec::with_capacity(rows.len());
for (conversation_id, summary, updated_at) in rows {
out.push(crate::types::HistoryRow {
conversation_id,
summary,
updated_at: crate::types::parse_sqlite_timestamp(&updated_at)?,
});
}
Ok(out)
}
pub async fn get_memory_stats(
&self,
sender_id: &str,
) -> Result<(i64, i64, i64, i64), MemoryError> {
let (conv_count,): (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM conversations WHERE sender_id = ?")
.bind(sender_id)
.fetch_one(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
let (msg_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM messages m \
JOIN conversations c ON m.conversation_id = c.id \
WHERE c.sender_id = ?",
)
.bind(sender_id)
.fetch_one(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
let obs_count = self.count_observations(sender_id).await?;
let (fact_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM facts \
WHERE sender_id = ? AND deleted_at IS NULL",
)
.bind(sender_id)
.fetch_one(&self.pool)
.await
.map_err(|e| MemoryError::sqlite("query failed", e))?;
Ok((conv_count, msg_count, obs_count, fact_count))
}
}