use anyhow::Result;
use chrono::Utc;
use rusqlite::{params, Connection, OptionalExtension, Transaction};
use super::connection::with_connection;
use super::types::SourceKind;
use crate::memory::config::MemoryConfig;
pub fn set_chunk_lifecycle_status(
config: &MemoryConfig,
chunk_id: &str,
status: &str,
) -> Result<()> {
with_connection(config, |conn| {
set_chunk_lifecycle_status_conn(conn, chunk_id, status)
})
}
#[allow(dead_code)]
pub(crate) fn set_chunk_lifecycle_status_tx(
tx: &Transaction<'_>,
chunk_id: &str,
status: &str,
) -> Result<()> {
set_chunk_lifecycle_status_conn(tx, chunk_id, status)
}
fn set_chunk_lifecycle_status_conn(conn: &Connection, chunk_id: &str, status: &str) -> Result<()> {
let changed = conn.execute(
"UPDATE mem_tree_chunks SET lifecycle_status = ?1 WHERE id = ?2",
params![status, chunk_id],
)?;
if changed == 0 {}
Ok(())
}
pub fn get_chunk_lifecycle_status(config: &MemoryConfig, chunk_id: &str) -> Result<Option<String>> {
with_connection(config, |conn| {
let row = conn
.query_row(
"SELECT lifecycle_status FROM mem_tree_chunks WHERE id = ?1",
params![chunk_id],
|r| r.get::<_, String>(0),
)
.optional()?;
Ok(row)
})
}
pub fn count_chunks_by_lifecycle_status(config: &MemoryConfig, status: &str) -> Result<u64> {
with_connection(config, |conn| {
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM mem_tree_chunks WHERE lifecycle_status = ?1",
params![status],
|r| r.get(0),
)?;
Ok(n.max(0) as u64)
})
}
pub fn is_source_ingested(
config: &MemoryConfig,
source_kind: SourceKind,
source_id: &str,
) -> Result<bool> {
with_connection(config, |conn| {
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM mem_tree_ingested_sources \
WHERE source_kind = ?1 AND source_id = ?2",
params![source_kind.as_str(), source_id],
|r| r.get(0),
)?;
Ok(n > 0)
})
}
pub fn claim_source_ingest_tx(
tx: &Transaction<'_>,
source_kind: SourceKind,
source_id: &str,
now_ms: i64,
) -> Result<bool> {
let inserted = tx.execute(
"INSERT OR IGNORE INTO mem_tree_ingested_sources \
(source_kind, source_id, ingested_at_ms) \
VALUES (?1, ?2, ?3)",
params![source_kind.as_str(), source_id, now_ms],
)?;
Ok(inserted > 0)
}
pub const RAW_FILE_GATE_KIND: &str = "raw_file";
pub fn mark_raw_paths_ingested(config: &MemoryConfig, rel_paths: &[String]) -> Result<u64> {
if rel_paths.is_empty() {
return Ok(0);
}
let now_ms = Utc::now().timestamp_millis();
with_connection(config, |conn| {
let tx = conn.unchecked_transaction()?;
let mut inserted: u64 = 0;
{
let mut stmt = tx.prepare(
"INSERT OR IGNORE INTO mem_tree_ingested_sources \
(source_kind, source_id, ingested_at_ms) \
VALUES (?1, ?2, ?3)",
)?;
for path in rel_paths {
inserted += stmt.execute(params![RAW_FILE_GATE_KIND, path, now_ms])? as u64;
}
}
tx.commit()?;
Ok(inserted)
})
}
pub fn filter_raw_paths_not_ingested(
config: &MemoryConfig,
rel_paths: &[String],
) -> Result<Vec<String>> {
if rel_paths.is_empty() {
return Ok(Vec::new());
}
with_connection(config, |conn| {
let mut stmt = conn.prepare(
"SELECT COUNT(*) FROM mem_tree_ingested_sources \
WHERE source_kind = ?1 AND source_id = ?2",
)?;
let mut out: Vec<String> = Vec::new();
for path in rel_paths {
let n: i64 = stmt.query_row(params![RAW_FILE_GATE_KIND, path], |r| r.get(0))?;
if n == 0 {
out.push(path.clone());
}
}
Ok(out)
})
}
pub fn count_raw_paths_ingested_with_prefix(
config: &MemoryConfig,
rel_prefix: &str,
) -> Result<u64> {
with_connection(config, |conn| {
let mut stmt =
conn.prepare("SELECT source_id FROM mem_tree_ingested_sources WHERE source_kind = ?1")?;
let rows = stmt.query_map(params![RAW_FILE_GATE_KIND], |r| r.get::<_, String>(0))?;
let mut n: u64 = 0;
for row in rows {
if row?.starts_with(rel_prefix) {
n += 1;
}
}
Ok(n)
})
}