nils-memo-cli 0.5.3

CLI crate for nils-memo-cli in the nils-cli workspace.
Documentation
use rusqlite::{Connection, params};
use serde::Serialize;

use crate::errors::AppError;

const INBOX_ITEM_ALLOCATOR_NAME: &str = "inbox_items";

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryState {
    All,
    Pending,
    Enriched,
}

#[derive(Debug, Clone, Serialize)]
pub struct AddedItem {
    pub item_id: i64,
    pub created_at: String,
    pub source: String,
    pub text: String,
}

#[derive(Debug, Clone, Serialize)]
pub struct UpdatedItem {
    pub item_id: i64,
    pub updated_at: String,
    pub text: String,
    pub cleared_derivations: i64,
    pub cleared_workflow_anchors: i64,
}

#[derive(Debug, Clone, Serialize)]
pub struct DeletedItem {
    pub item_id: i64,
    pub deleted_at: String,
    pub removed_derivations: i64,
    pub removed_workflow_anchors: i64,
}

#[derive(Debug, Clone, Serialize)]
pub struct ListItem {
    pub item_id: i64,
    pub created_at: String,
    pub state: String,
    pub text_preview: String,
    pub content_type: Option<String>,
    pub validation_status: Option<String>,
}

#[derive(Debug, Clone, Serialize)]
pub struct FetchItem {
    pub item_id: i64,
    pub created_at: String,
    pub source: String,
    pub text: String,
    pub state: String,
    pub content_type: Option<String>,
    pub validation_status: Option<String>,
}

#[derive(Debug, Clone)]
pub struct FetchCursor {
    pub item_id: i64,
    pub created_at: String,
}

fn ensure_item_allocator_seeded(conn: &Connection) -> Result<(), AppError> {
    conn.execute(
        "insert into id_allocators(name, last_id)
         values (?1, coalesce((select max(item_id) from inbox_items), 0))
         on conflict(name) do update
         set last_id = max(id_allocators.last_id, excluded.last_id)",
        params![INBOX_ITEM_ALLOCATOR_NAME],
    )
    .map_err(AppError::db_write)?;
    Ok(())
}

fn allocate_next_item_id(conn: &Connection) -> Result<i64, AppError> {
    ensure_item_allocator_seeded(conn)?;
    conn.execute(
        "update id_allocators
         set last_id = last_id + 1
         where name = ?1",
        params![INBOX_ITEM_ALLOCATOR_NAME],
    )
    .map_err(AppError::db_write)?;

    conn.query_row(
        "select last_id from id_allocators where name = ?1",
        params![INBOX_ITEM_ALLOCATOR_NAME],
        |row| row.get(0),
    )
    .map_err(AppError::db_query)
}

pub fn add_item(
    conn: &Connection,
    text: &str,
    source: &str,
    created_at: Option<&str>,
) -> Result<AddedItem, AppError> {
    let item_id = allocate_next_item_id(conn)?;

    if let Some(created_at) = created_at {
        conn.execute(
            "insert into inbox_items(item_id, source, raw_text, created_at)
             values(?1, ?2, ?3, ?4)",
            params![item_id, source, text, created_at],
        )
        .map_err(AppError::db_write)?;
    } else {
        conn.execute(
            "insert into inbox_items(item_id, source, raw_text) values(?1, ?2, ?3)",
            params![item_id, source, text],
        )
        .map_err(AppError::db_write)?;
    }

    let created_at: String = conn
        .query_row(
            "select created_at from inbox_items where item_id = ?1",
            params![item_id],
            |row| row.get(0),
        )
        .map_err(AppError::db_query)?;

    Ok(AddedItem {
        item_id,
        created_at,
        source: source.to_string(),
        text: text.to_string(),
    })
}

pub fn update_item(conn: &Connection, item_id: i64, text: &str) -> Result<UpdatedItem, AppError> {
    let text = text.trim();
    if text.is_empty() {
        return Err(AppError::usage("update requires a non-empty text argument"));
    }

    let exists: i64 = conn
        .query_row(
            "select count(*) from inbox_items where item_id = ?1",
            params![item_id],
            |row| row.get(0),
        )
        .map_err(AppError::db_query)?;
    if exists == 0 {
        return Err(AppError::usage("item_id does not exist")
            .with_code("invalid-item-id")
            .with_details(serde_json::json!({ "item_id": item_id })));
    }

    let removed_workflow_anchors = conn
        .execute(
            "delete from workflow_item_anchors where item_id = ?1",
            params![item_id],
        )
        .map_err(AppError::db_write)? as i64;
    let cleared_derivations = conn
        .execute(
            "delete from item_derivations where item_id = ?1",
            params![item_id],
        )
        .map_err(AppError::db_write)? as i64;
    conn.execute(
        "update inbox_items set raw_text = ?1 where item_id = ?2",
        params![text, item_id],
    )
    .map_err(AppError::db_write)?;
    conn.execute(
        "delete from tags
         where not exists (
           select 1
           from item_tags it
           where it.tag_id = tags.tag_id
         )",
        [],
    )
    .map_err(AppError::db_write)?;

    let updated_at: String = conn
        .query_row(
            "select updated_at from item_search_documents where item_id = ?1",
            params![item_id],
            |row| row.get(0),
        )
        .map_err(AppError::db_query)?;

    Ok(UpdatedItem {
        item_id,
        updated_at,
        text: text.to_string(),
        cleared_derivations,
        cleared_workflow_anchors: removed_workflow_anchors,
    })
}

pub fn delete_item_hard(conn: &Connection, item_id: i64) -> Result<DeletedItem, AppError> {
    let exists: i64 = conn
        .query_row(
            "select count(*) from inbox_items where item_id = ?1",
            params![item_id],
            |row| row.get(0),
        )
        .map_err(AppError::db_query)?;
    if exists == 0 {
        return Err(AppError::usage("item_id does not exist")
            .with_code("invalid-item-id")
            .with_details(serde_json::json!({ "item_id": item_id })));
    }

    let removed_workflow_anchors = conn
        .execute(
            "delete from workflow_item_anchors where item_id = ?1",
            params![item_id],
        )
        .map_err(AppError::db_write)? as i64;
    let removed_derivations = conn
        .execute(
            "delete from item_derivations where item_id = ?1",
            params![item_id],
        )
        .map_err(AppError::db_write)? as i64;
    conn.execute(
        "delete from item_search_documents where item_id = ?1",
        params![item_id],
    )
    .map_err(AppError::db_write)?;
    conn.execute(
        "delete from inbox_items where item_id = ?1",
        params![item_id],
    )
    .map_err(AppError::db_write)?;
    conn.execute(
        "delete from tags
         where not exists (
           select 1
           from item_tags it
           where it.tag_id = tags.tag_id
         )",
        [],
    )
    .map_err(AppError::db_write)?;

    let deleted_at: String = conn
        .query_row("select strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", [], |row| {
            row.get(0)
        })
        .map_err(AppError::db_query)?;

    Ok(DeletedItem {
        item_id,
        deleted_at,
        removed_derivations,
        removed_workflow_anchors,
    })
}

pub fn list_items(
    conn: &Connection,
    state: QueryState,
    limit: usize,
    offset: usize,
) -> Result<Vec<ListItem>, AppError> {
    let state_filter = state_sql(state);
    let sql = format!(
        "select
            i.item_id,
            i.created_at,
            case
                when ad.derivation_id is not null then 'enriched'
                else 'pending'
            end as state,
            substr(i.raw_text, 1, 80) as text_preview,
            json_extract(ad.payload_json, '$.content_type') as content_type,
            json_extract(ad.payload_json, '$.validation_status') as validation_status
        from inbox_items i
        left join item_derivations ad
          on ad.derivation_id = (
            select d.derivation_id
            from item_derivations d
            where d.item_id = i.item_id
              and d.is_active = 1
              and d.status = 'accepted'
            order by d.derivation_version desc, d.derivation_id desc
            limit 1
          )
        where {state_filter}
        order by i.created_at desc, i.item_id desc
        limit ?1 offset ?2"
    );

    let mut stmt = conn.prepare(&sql).map_err(AppError::db_query)?;
    let rows = stmt
        .query_map(params![limit as i64, offset as i64], |row| {
            Ok(ListItem {
                item_id: row.get(0)?,
                created_at: row.get(1)?,
                state: row.get(2)?,
                text_preview: row.get(3)?,
                content_type: row.get(4)?,
                validation_status: row.get(5)?,
            })
        })
        .map_err(AppError::db_query)?;

    rows.collect::<Result<Vec<_>, _>>()
        .map_err(AppError::db_query)
}

pub fn lookup_fetch_cursor(
    conn: &Connection,
    item_id: i64,
) -> Result<Option<FetchCursor>, AppError> {
    conn.query_row(
        "select item_id, created_at from inbox_items where item_id = ?1",
        params![item_id],
        |row| {
            Ok(FetchCursor {
                item_id: row.get(0)?,
                created_at: row.get(1)?,
            })
        },
    )
    .map(Some)
    .or_else(|err| match err {
        rusqlite::Error::QueryReturnedNoRows => Ok(None),
        other => Err(AppError::db_query(other)),
    })
}

pub fn fetch_pending_page(
    conn: &Connection,
    limit: usize,
    cursor: Option<&FetchCursor>,
) -> Result<Vec<FetchItem>, AppError> {
    let mut stmt = conn
        .prepare(
            "select i.item_id, i.created_at, i.source, i.raw_text
                    , null as content_type
                    , null as validation_status
            from inbox_items i
            where not exists (
                select 1 from item_derivations d
                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
            )
              and (
                ?1 is null
                or i.created_at < ?2
                or (i.created_at = ?2 and i.item_id < ?1)
              )
            order by i.created_at desc, i.item_id desc
            limit ?3",
        )
        .map_err(AppError::db_query)?;

    let cursor_item_id = cursor.map(|value| value.item_id);
    let cursor_created_at = cursor.map(|value| value.created_at.as_str());

    let rows = stmt
        .query_map(
            params![cursor_item_id, cursor_created_at, limit as i64],
            |row| {
                Ok(FetchItem {
                    item_id: row.get(0)?,
                    created_at: row.get(1)?,
                    source: row.get(2)?,
                    text: row.get(3)?,
                    state: "pending".to_string(),
                    content_type: row.get(4)?,
                    validation_status: row.get(5)?,
                })
            },
        )
        .map_err(AppError::db_query)?;

    rows.collect::<Result<Vec<_>, _>>()
        .map_err(AppError::db_query)
}

fn state_sql(state: QueryState) -> &'static str {
    match state {
        QueryState::All => "1 = 1",
        QueryState::Pending => {
            "not exists (
                select 1 from item_derivations d
                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
            )"
        }
        QueryState::Enriched => {
            "exists (
                select 1 from item_derivations d
                where d.item_id = i.item_id and d.is_active = 1 and d.status = 'accepted'
            )"
        }
    }
}