moire-web 2.0.0-rc.0

Web dashboard server for moire: REST API, MCP tools, and live entity graph viewer
Documentation
use facet::Facet;
use facet_value::Value;
use moire_types::{ProcessId, ScopeEntityLink, SqlResponse};
use rusqlite_facet::ConnectionFacetExt;

use crate::db::Db;

#[derive(Facet)]
struct ScopeEntityLinkParams {
    process_id: ProcessId,
}

pub fn fetch_scope_entity_links_blocking(
    db: &Db,
    process_id: ProcessId,
) -> Result<Vec<ScopeEntityLink>, String> {
    let conn = db.open()?;
    conn.facet_query_ref::<ScopeEntityLink, _>(
        "SELECT scope_id, entity_id FROM entity_scope_links WHERE process_id = :process_id",
        &ScopeEntityLinkParams { process_id },
    )
    .map_err(|error| format!("query scope_entity_links: {error}"))
}

pub fn sql_query_blocking(db: &Db, sql: &str) -> Result<SqlResponse, String> {
    let sql = sql.trim();
    if sql.is_empty() {
        return Err("empty SQL".to_string());
    }

    let conn = db.open()?;
    let mut stmt = conn
        .prepare(sql)
        .map_err(|error| format!("prepare sql: {error}"))?;
    if !stmt.readonly() {
        return Err("only read-only statements are allowed".to_string());
    }

    let column_count = stmt.column_count();
    let columns: Vec<String> = (0..column_count)
        .map(|index| String::from(stmt.column_name(index).unwrap_or("?")))
        .collect();

    let mut rows = Vec::new();
    let mut raw_rows = stmt.raw_query();

    while let Some(row) = raw_rows
        .next()
        .map_err(|error| format!("query row: {error}"))?
    {
        let mut row_values = Vec::with_capacity(column_count);
        for index in 0..column_count {
            let value_ref = row
                .get_ref(index)
                .map_err(|error| format!("read column {index}: {error}"))?;
            row_values.push(moire_sqlite_facet::sqlite_value_ref_to_facet(value_ref));
        }
        let row_value: Value = row_values.into_iter().collect();
        rows.push(row_value);
    }

    Ok(SqlResponse {
        columns,
        row_count: rows.len() as u32,
        rows,
    })
}

pub fn query_named_blocking(db: &Db, name: &str, limit: u32) -> Result<SqlResponse, String> {
    let sql = named_query_sql(name, limit)?;
    sql_query_blocking(db, &sql)
}

fn named_query_sql(name: &str, limit: u32) -> Result<String, String> {
    match name {
        "blockers" => Ok(format!(
            "select \
             e.src_id as waiter_id, \
             json_extract(src.entity_json, '$.name') as waiter_name, \
             e.dst_id as blocked_on_id, \
             json_extract(dst.entity_json, '$.name') as blocked_on_name, \
             e.kind_json \
             from edges e \
             left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
             left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
             where e.kind_json = '\"needs\"' \
             order by e.updated_at_ns desc \
             limit {limit}"
        )),
        "blocked-senders" => Ok(format!(
            "select \
             f.entity_id as send_future_id, \
             json_extract(f.entity_json, '$.name') as send_name, \
             e.dst_id as waiting_on_entity_id, \
             json_extract(ch.entity_json, '$.name') as waiting_on_name, \
             e.updated_at_ns \
             from edges e \
             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
             where e.kind_json = '\"needs\"' \
               and json_extract(f.entity_json, '$.body') = 'future' \
               and json_extract(f.entity_json, '$.name') like '%.send' \
             order by e.updated_at_ns desc \
             limit {limit}"
        )),
        "blocked-receivers" => Ok(format!(
            "select \
             f.entity_id as recv_future_id, \
             json_extract(f.entity_json, '$.name') as recv_name, \
             e.dst_id as waiting_on_entity_id, \
             json_extract(ch.entity_json, '$.name') as waiting_on_name, \
             e.updated_at_ns \
             from edges e \
             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
             where e.kind_json = '\"needs\"' \
               and json_extract(f.entity_json, '$.body') = 'future' \
               and json_extract(f.entity_json, '$.name') like '%.recv' \
             order by e.updated_at_ns desc \
             limit {limit}"
        )),
        "stalled-sends" => Ok(format!(
            "select \
             f.entity_id as send_future_id, \
             json_extract(f.entity_json, '$.name') as send_name, \
             e.dst_id as waiting_on_entity_id, \
             json_extract(ch.entity_json, '$.name') as waiting_on_name \
             from edges e \
             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
             where e.kind_json = '\"needs\"' \
               and json_extract(f.entity_json, '$.body') = 'future' \
               and json_extract(f.entity_json, '$.name') like '%.send' \
             order by e.updated_at_ns desc \
             limit {limit}"
        )),
        "channel-pressure" => Ok(format!(
            "select \
             entity_id, \
             json_extract(entity_json, '$.name') as name, \
             coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as capacity, \
             coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as occupancy, \
             case \
               when coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) > 0 \
               then cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as real) / \
                    cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as real) \
               else null \
             end as utilization \
             from entities \
             where json_extract(entity_json, '$.body.channel_tx.details.mpsc') is not null \
                or json_extract(entity_json, '$.body.channel_rx.details.mpsc') is not null \
             order by utilization desc, name asc \
             limit {limit}"
        )),
        "channel-health" => Ok(format!(
            "select \
             entity_id, \
             json_extract(entity_json, '$.name') as name, \
             coalesce( \
               json_extract(entity_json, '$.body.channel_tx.lifecycle'), \
               json_extract(entity_json, '$.body.channel_rx.lifecycle') \
             ) as lifecycle, \
             coalesce( \
               json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), \
               json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity') \
             ) as capacity, \
             coalesce( \
               json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), \
               json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy') \
             ) as occupancy \
             from entities \
             where json_extract(entity_json, '$.body.channel_tx') is not null \
                or json_extract(entity_json, '$.body.channel_rx') is not null \
             order by name \
             limit {limit}"
        )),
        "scope-membership" => Ok(format!(
            "select \
             l.scope_id, \
             json_extract(s.scope_json, '$.name') as scope_name, \
             l.entity_id, \
             json_extract(e.entity_json, '$.name') as entity_name \
             from entity_scope_links l \
             left join scopes s on s.process_id = l.process_id and s.scope_id = l.scope_id \
             left join entities e on e.process_id = l.process_id and e.entity_id = l.entity_id \
             order by scope_name asc, entity_name asc \
             limit {limit}"
        )),
        "missing-scope-links" => Ok(format!(
            "select \
             e.process_id as process_id, \
             c.process_name, \
             c.pid, \
             e.entity_id, \
             json_extract(e.entity_json, '$.name') as entity_name, \
             json_extract(e.entity_json, '$.body') as entity_body, \
             case \
               when p.process_scope_count is null then 1 \
               else 0 \
             end as missing_process_scope_link, \
             case \
               when json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null then 1 \
               else 0 \
             end as missing_task_scope_link \
             from entities e \
             left join connections c \
               on c.process_id = e.process_id \
             left join ( \
               select \
                 l.process_id, \
                 l.entity_id, \
                 count(*) as process_scope_count \
               from entity_scope_links l \
               join scopes s \
                 on s.process_id = l.process_id \
                and s.scope_id = l.scope_id \
               where json_extract(s.scope_json, '$.body') = 'process' \
               group by l.process_id, l.entity_id \
             ) p \
               on p.process_id = e.process_id \
              and p.entity_id = e.entity_id \
             left join ( \
               select \
                 l.process_id, \
                 l.entity_id, \
                 count(*) as task_scope_count \
               from entity_scope_links l \
               join scopes s \
                 on s.process_id = l.process_id \
                and s.scope_id = l.scope_id \
               where json_extract(s.scope_json, '$.body') = 'task' \
               group by l.process_id, l.entity_id \
             ) t \
               on t.process_id = e.process_id \
              and t.entity_id = e.entity_id \
             where p.process_scope_count is null \
                or (json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null) \
             order by c.process_name asc, entity_name asc, e.entity_id asc \
             limit {limit}"
        )),
        "stale-blockers" => Ok(format!(
            "select \
             e.src_id as waiter_id, \
             json_extract(src.entity_json, '$.name') as waiter_name, \
             e.dst_id as blocked_on_id, \
             json_extract(dst.entity_json, '$.name') as blocked_on_name, \
             e.updated_at_ns \
             from edges e \
             left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
             left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
             where e.kind_json = '\"needs\"' \
             order by e.updated_at_ns asc \
             limit {limit}"
        )),
        _ => Err(format!(
            "unknown query pack: {name}. expected one of: blockers, blocked-senders, blocked-receivers, stalled-sends, channel-pressure, channel-health, scope-membership, missing-scope-links, stale-blockers"
        )),
    }
}