use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use duckdb::types::Value as DuckValue;
use uuid::Uuid;
use super::types::{now_secs, ActionId, Lifetime, Message, Severity};
use crate::storage::engine::StorageEngine;
const INIT_SQL: &str = "
CREATE TABLE IF NOT EXISTS pane_output_messages (
id TEXT NOT NULL PRIMARY KEY,
kind TEXT NOT NULL,
ts BIGINT NOT NULL, -- unix seconds
metadata_json TEXT NOT NULL,
actions_json TEXT NOT NULL,
severity TEXT NOT NULL,
lifetime_kind TEXT NOT NULL,
lifetime_value TEXT,
expires_at BIGINT, -- unix seconds; NULL = no wall-clock expiry
pinned BOOLEAN NOT NULL DEFAULT FALSE,
dismissed BOOLEAN NOT NULL DEFAULT FALSE,
dismissed_at BIGINT,
snoozed_until BIGINT,
group_key TEXT,
source_paragraph_id TEXT,
source_language_id TEXT,
trace_id TEXT
);
CREATE INDEX IF NOT EXISTS idx_pom_active ON pane_output_messages(dismissed, expires_at, ts);
CREATE INDEX IF NOT EXISTS idx_pom_kind ON pane_output_messages(kind, ts);
CREATE INDEX IF NOT EXISTS idx_pom_group ON pane_output_messages(group_key, ts);
";
#[derive(Clone)]
pub struct OutputStore {
engine: Arc<StorageEngine>,
}
fn text(v: Option<&DuckValue>) -> Option<String> {
match v {
Some(DuckValue::Text(s)) => Some(s.clone()),
_ => None,
}
}
fn int(v: Option<&DuckValue>) -> Option<i64> {
match v {
Some(DuckValue::BigInt(i)) => Some(*i),
Some(DuckValue::Int(i)) => Some(*i as i64),
Some(DuckValue::HugeInt(i)) => Some(*i as i64),
_ => None,
}
}
fn boolean(v: Option<&DuckValue>) -> bool {
matches!(v, Some(DuckValue::Boolean(true)))
}
impl OutputStore {
pub fn open(path: &Path) -> Result<Self> {
let engine = StorageEngine::new(path, INIT_SQL, 2)?;
Ok(Self { engine: Arc::new(engine) })
}
pub fn open_for_project(project_root: &Path) -> Result<Self> {
Self::open(&project_root.join("output.db"))
}
pub fn emit(&self, message: &Message) -> Result<Uuid> {
let (lk, lv) = message.lifetime.to_columns();
let id = message.id.to_string();
let metadata = message.metadata.to_string();
let actions: Vec<&str> = message.actions.iter().map(|a| a.as_str()).collect();
let actions_json = serde_json::to_string(&actions).unwrap_or_else(|_| "[]".to_string());
let severity = message.severity.as_str().to_string();
let lifetime_kind = lk.to_string();
let group_key = message.group_key.clone();
let src_para = message.source_paragraph_id.map(|u| u.to_string());
let src_lang = message.source_language_id.clone();
let trace = message.trace_id.map(|u| u.to_string());
self.engine.execute_with(
"INSERT INTO pane_output_messages \
(id, kind, ts, metadata_json, actions_json, severity, lifetime_kind, \
lifetime_value, expires_at, pinned, dismissed, dismissed_at, snoozed_until, \
group_key, source_paragraph_id, source_language_id, trace_id) \
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
&[
&id,
&message.kind,
&message.timestamp,
&metadata,
&actions_json,
&severity,
&lifetime_kind,
&lv,
&message.expires_at,
&message.pinned,
&message.dismissed,
&message.dismissed_at,
&message.snoozed_until,
&group_key,
&src_para,
&src_lang,
&trace,
],
)?;
Ok(message.id)
}
const COLS: &'static str = "id, kind, ts, metadata_json, actions_json, severity, \
lifetime_kind, lifetime_value, expires_at, pinned, dismissed, dismissed_at, \
snoozed_until, group_key, source_paragraph_id, source_language_id, trace_id";
fn parse_row(row: &[DuckValue]) -> Option<Message> {
let g = |i: usize| row.get(i);
let id = Uuid::parse_str(&text(g(0))?).ok()?;
let metadata = text(g(3))
.and_then(|s| serde_json::from_str(&s).ok())
.unwrap_or(serde_json::Value::Null);
let actions: Vec<ActionId> = text(g(4))
.and_then(|s| serde_json::from_str::<Vec<String>>(&s).ok())
.unwrap_or_default()
.iter()
.filter_map(|s| ActionId::parse(s))
.collect();
Some(Message {
id,
kind: text(g(1))?,
timestamp: int(g(2)).unwrap_or(0),
metadata,
actions,
severity: Severity::parse(&text(g(5)).unwrap_or_default()),
lifetime: Lifetime::from_columns(
&text(g(6)).unwrap_or_default(),
text(g(7)).as_deref(),
),
expires_at: int(g(8)),
pinned: boolean(g(9)),
dismissed: boolean(g(10)),
dismissed_at: int(g(11)),
snoozed_until: int(g(12)),
group_key: text(g(13)),
source_paragraph_id: text(g(14)).and_then(|s| Uuid::parse_str(&s).ok()),
source_language_id: text(g(15)),
trace_id: text(g(16)).and_then(|s| Uuid::parse_str(&s).ok()),
})
}
pub fn active(&self) -> Result<Vec<Message>> {
let now = now_secs();
let sql = format!(
"SELECT {cols} FROM pane_output_messages \
WHERE NOT dismissed \
AND (expires_at IS NULL OR expires_at > ?) \
AND (snoozed_until IS NULL OR snoozed_until <= ?) \
ORDER BY pinned DESC, ts DESC",
cols = Self::COLS
);
let rows = self.engine.select_all_with(&sql, &[&now, &now])?;
Ok(rows.iter().filter_map(|r| Self::parse_row(r)).collect())
}
pub fn by_kind(&self, kind: &str) -> Result<Vec<Message>> {
let now = now_secs();
let sql = format!(
"SELECT {cols} FROM pane_output_messages \
WHERE NOT dismissed AND kind = ? \
AND (expires_at IS NULL OR expires_at > ?) \
AND (snoozed_until IS NULL OR snoozed_until <= ?) \
ORDER BY pinned DESC, ts DESC",
cols = Self::COLS
);
let rows = self.engine.select_all_with(&sql, &[&kind, &now, &now])?;
Ok(rows.iter().filter_map(|r| Self::parse_row(r)).collect())
}
pub fn count_active(&self, kind: Option<&str>) -> Result<usize> {
Ok(match kind {
Some(k) => self.by_kind(k)?.len(),
None => self.active()?.len(),
})
}
pub fn dismiss(&self, id: Uuid) -> Result<()> {
let id = id.to_string();
let now = now_secs();
self.engine.execute_with(
"UPDATE pane_output_messages SET dismissed = TRUE, dismissed_at = ? WHERE id = ?",
&[&now, &id],
)
}
pub fn set_pinned(&self, id: Uuid, pinned: bool) -> Result<()> {
let id = id.to_string();
self.engine.execute_with(
"UPDATE pane_output_messages SET pinned = ? WHERE id = ?",
&[&pinned, &id],
)
}
pub fn snooze(&self, id: Uuid, until: i64) -> Result<()> {
let id = id.to_string();
self.engine.execute_with(
"UPDATE pane_output_messages SET snoozed_until = ? WHERE id = ?",
&[&until, &id],
)
}
pub fn cleanup(&self) -> Result<usize> {
let now = now_secs();
self.engine.execute_with(
"DELETE FROM pane_output_messages \
WHERE expires_at IS NOT NULL AND expires_at < ? AND NOT pinned",
&[&now],
)?;
let session_rows = self.engine.select_all(
"SELECT DISTINCT kind, lifetime_value FROM pane_output_messages \
WHERE lifetime_kind = 'session'",
)?;
for row in &session_rows {
let kind = match text(row.first()) {
Some(k) => k,
None => continue,
};
let n: i64 = text(row.get(1)).and_then(|v| v.parse().ok()).unwrap_or(100);
self.engine.execute_with(
"DELETE FROM pane_output_messages \
WHERE kind = ? AND lifetime_kind = 'session' AND NOT pinned AND id IN ( \
SELECT id FROM pane_output_messages \
WHERE kind = ? AND lifetime_kind = 'session' AND NOT pinned \
ORDER BY ts DESC OFFSET ? \
)",
&[&kind, &kind, &n],
)?;
}
Ok(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::types::{kinds, Lifetime, Severity};
fn store() -> OutputStore {
OutputStore::open(Path::new(":memory:")).unwrap()
}
fn msg(kind: &str, sev: Severity, life: Lifetime) -> Message {
Message::new(kind, sev, life, serde_json::json!({ "text": "hello" }))
}
#[test]
fn emit_and_query_active() {
let s = store();
let m = msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Session(100));
let id = s.emit(&m).unwrap();
let active = s.active().unwrap();
assert_eq!(active.len(), 1);
assert_eq!(active[0].id, id);
assert_eq!(active[0].kind, kinds::BUND_PRINT);
assert_eq!(active[0].metadata["text"], "hello");
}
#[test]
fn dismiss_removes_from_active() {
let s = store();
let id = s.emit(&msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Never)).unwrap();
assert_eq!(s.count_active(None).unwrap(), 1);
s.dismiss(id).unwrap();
assert_eq!(s.count_active(None).unwrap(), 0);
}
#[test]
fn pinned_sorts_first() {
let s = store();
let a = msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Never);
std::thread::sleep(std::time::Duration::from_millis(2));
let b = msg(kinds::BUND_LOG, Severity::Info, Lifetime::Never);
s.emit(&a).unwrap();
let bid = s.emit(&b).unwrap();
s.set_pinned(a.id, true).unwrap();
let active = s.active().unwrap();
assert_eq!(active[0].id, a.id);
assert!(active.iter().any(|m| m.id == bid));
}
#[test]
fn snoozed_is_hidden_until_due() {
let s = store();
let id = s.emit(&msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Never)).unwrap();
s.snooze(id, now_secs() + 3600).unwrap();
assert_eq!(s.count_active(None).unwrap(), 0);
s.snooze(id, now_secs() - 10).unwrap(); assert_eq!(s.count_active(None).unwrap(), 1);
}
#[test]
fn by_kind_filters() {
let s = store();
s.emit(&msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Never)).unwrap();
s.emit(&msg(kinds::TRANSLATION_RESULT, Severity::Info, Lifetime::Never)).unwrap();
assert_eq!(s.by_kind(kinds::BUND_PRINT).unwrap().len(), 1);
assert_eq!(s.by_kind(kinds::TRANSLATION_RESULT).unwrap().len(), 1);
assert_eq!(s.count_active(None).unwrap(), 2);
}
#[test]
fn session_cleanup_trims_to_n() {
let s = store();
for _ in 0..5 {
s.emit(&msg(kinds::BUND_PRINT, Severity::Info, Lifetime::Session(3))).unwrap();
std::thread::sleep(std::time::Duration::from_millis(2));
}
s.cleanup().unwrap();
assert_eq!(s.by_kind(kinds::BUND_PRINT).unwrap().len(), 3);
}
#[test]
fn lexicon_proposal_envelope_round_trips() {
use super::super::types::ActionId;
let s = store();
let meta = serde_json::json!({
"text": "2 proposed word(s)",
"language": "Avesha",
"proposals": [
{ "form": "mara", "gloss": "ocean", "pos": "noun" },
{ "form": "tuli", "gloss": "wave", "pos": "noun" },
],
});
let m = Message::new(kinds::LEXICON_PROPOSAL, Severity::Info, Lifetime::UntilActedOn, meta)
.with_actions(vec![ActionId::Promote, ActionId::Dismiss]);
s.emit(&m).unwrap();
let back = &s.active().unwrap()[0];
assert_eq!(back.kind, kinds::LEXICON_PROPOSAL);
assert_eq!(back.metadata["proposals"].as_array().unwrap().len(), 2);
assert_eq!(back.metadata["proposals"][0]["form"], "mara");
assert!(back.actions.contains(&ActionId::Promote));
}
#[test]
fn ai_task_complete_carries_target_paragraph() {
use super::super::types::ActionId;
let s = store();
let target = uuid::Uuid::new_v4();
let m = Message::new(
kinds::AI_TASK_COMPLETE,
Severity::Info,
Lifetime::Hours(12.0),
serde_json::json!({ "text": "refresh done", "elapsed_seconds": 47 }),
)
.with_actions(vec![ActionId::Primary, ActionId::Dismiss, ActionId::Pin])
.with_source_paragraph(target);
s.emit(&m).unwrap();
let back = &s.active().unwrap()[0];
assert_eq!(back.kind, kinds::AI_TASK_COMPLETE);
assert_eq!(back.source_paragraph_id, Some(target));
assert!(back.actions.contains(&ActionId::Primary));
assert_eq!(back.metadata["elapsed_seconds"], 47);
}
}