Skip to main content

moire_web/db/
query.rs

1use facet::Facet;
2use facet_value::Value;
3use moire_types::{ProcessId, ScopeEntityLink, SqlResponse};
4use rusqlite_facet::ConnectionFacetExt;
5
6use crate::db::Db;
7
8#[derive(Facet)]
9struct ScopeEntityLinkParams {
10    process_id: ProcessId,
11}
12
13pub fn fetch_scope_entity_links_blocking(
14    db: &Db,
15    process_id: ProcessId,
16) -> Result<Vec<ScopeEntityLink>, String> {
17    let conn = db.open()?;
18    conn.facet_query_ref::<ScopeEntityLink, _>(
19        "SELECT scope_id, entity_id FROM entity_scope_links WHERE process_id = :process_id",
20        &ScopeEntityLinkParams { process_id },
21    )
22    .map_err(|error| format!("query scope_entity_links: {error}"))
23}
24
25pub fn sql_query_blocking(db: &Db, sql: &str) -> Result<SqlResponse, String> {
26    let sql = sql.trim();
27    if sql.is_empty() {
28        return Err("empty SQL".to_string());
29    }
30
31    let conn = db.open()?;
32    let mut stmt = conn
33        .prepare(sql)
34        .map_err(|error| format!("prepare sql: {error}"))?;
35    if !stmt.readonly() {
36        return Err("only read-only statements are allowed".to_string());
37    }
38
39    let column_count = stmt.column_count();
40    let columns: Vec<String> = (0..column_count)
41        .map(|index| String::from(stmt.column_name(index).unwrap_or("?")))
42        .collect();
43
44    let mut rows = Vec::new();
45    let mut raw_rows = stmt.raw_query();
46
47    loop {
48        let Some(row) = raw_rows
49            .next()
50            .map_err(|error| format!("query row: {error}"))?
51        else {
52            break;
53        };
54
55        let mut row_values = Vec::with_capacity(column_count);
56        for index in 0..column_count {
57            let value_ref = row
58                .get_ref(index)
59                .map_err(|error| format!("read column {index}: {error}"))?;
60            row_values.push(moire_sqlite_facet::sqlite_value_ref_to_facet(value_ref));
61        }
62        let row_value: Value = row_values.into_iter().collect();
63        rows.push(row_value);
64    }
65
66    Ok(SqlResponse {
67        columns,
68        row_count: rows.len() as u32,
69        rows,
70    })
71}
72
73pub fn query_named_blocking(db: &Db, name: &str, limit: u32) -> Result<SqlResponse, String> {
74    let sql = named_query_sql(name, limit)?;
75    sql_query_blocking(db, &sql)
76}
77
78fn named_query_sql(name: &str, limit: u32) -> Result<String, String> {
79    match name {
80        "blockers" => Ok(format!(
81            "select \
82             e.src_id as waiter_id, \
83             json_extract(src.entity_json, '$.name') as waiter_name, \
84             e.dst_id as blocked_on_id, \
85             json_extract(dst.entity_json, '$.name') as blocked_on_name, \
86             e.kind_json \
87             from edges e \
88             left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
89             left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
90             where e.kind_json = '\"needs\"' \
91             order by e.updated_at_ns desc \
92             limit {limit}"
93        )),
94        "blocked-senders" => Ok(format!(
95            "select \
96             f.entity_id as send_future_id, \
97             json_extract(f.entity_json, '$.name') as send_name, \
98             e.dst_id as waiting_on_entity_id, \
99             json_extract(ch.entity_json, '$.name') as waiting_on_name, \
100             e.updated_at_ns \
101             from edges e \
102             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
103             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
104             where e.kind_json = '\"needs\"' \
105               and json_extract(f.entity_json, '$.body') = 'future' \
106               and json_extract(f.entity_json, '$.name') like '%.send' \
107             order by e.updated_at_ns desc \
108             limit {limit}"
109        )),
110        "blocked-receivers" => Ok(format!(
111            "select \
112             f.entity_id as recv_future_id, \
113             json_extract(f.entity_json, '$.name') as recv_name, \
114             e.dst_id as waiting_on_entity_id, \
115             json_extract(ch.entity_json, '$.name') as waiting_on_name, \
116             e.updated_at_ns \
117             from edges e \
118             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
119             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
120             where e.kind_json = '\"needs\"' \
121               and json_extract(f.entity_json, '$.body') = 'future' \
122               and json_extract(f.entity_json, '$.name') like '%.recv' \
123             order by e.updated_at_ns desc \
124             limit {limit}"
125        )),
126        "stalled-sends" => Ok(format!(
127            "select \
128             f.entity_id as send_future_id, \
129             json_extract(f.entity_json, '$.name') as send_name, \
130             e.dst_id as waiting_on_entity_id, \
131             json_extract(ch.entity_json, '$.name') as waiting_on_name \
132             from edges e \
133             join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
134             left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
135             where e.kind_json = '\"needs\"' \
136               and json_extract(f.entity_json, '$.body') = 'future' \
137               and json_extract(f.entity_json, '$.name') like '%.send' \
138             order by e.updated_at_ns desc \
139             limit {limit}"
140        )),
141        "channel-pressure" => Ok(format!(
142            "select \
143             entity_id, \
144             json_extract(entity_json, '$.name') as name, \
145             coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as capacity, \
146             coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as occupancy, \
147             case \
148               when coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) > 0 \
149               then cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as real) / \
150                    cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as real) \
151               else null \
152             end as utilization \
153             from entities \
154             where json_extract(entity_json, '$.body.channel_tx.details.mpsc') is not null \
155                or json_extract(entity_json, '$.body.channel_rx.details.mpsc') is not null \
156             order by utilization desc, name asc \
157             limit {limit}"
158        )),
159        "channel-health" => Ok(format!(
160            "select \
161             entity_id, \
162             json_extract(entity_json, '$.name') as name, \
163             coalesce( \
164               json_extract(entity_json, '$.body.channel_tx.lifecycle'), \
165               json_extract(entity_json, '$.body.channel_rx.lifecycle') \
166             ) as lifecycle, \
167             coalesce( \
168               json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), \
169               json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity') \
170             ) as capacity, \
171             coalesce( \
172               json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), \
173               json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy') \
174             ) as occupancy \
175             from entities \
176             where json_extract(entity_json, '$.body.channel_tx') is not null \
177                or json_extract(entity_json, '$.body.channel_rx') is not null \
178             order by name \
179             limit {limit}"
180        )),
181        "scope-membership" => Ok(format!(
182            "select \
183             l.scope_id, \
184             json_extract(s.scope_json, '$.name') as scope_name, \
185             l.entity_id, \
186             json_extract(e.entity_json, '$.name') as entity_name \
187             from entity_scope_links l \
188             left join scopes s on s.process_id = l.process_id and s.scope_id = l.scope_id \
189             left join entities e on e.process_id = l.process_id and e.entity_id = l.entity_id \
190             order by scope_name asc, entity_name asc \
191             limit {limit}"
192        )),
193        "missing-scope-links" => Ok(format!(
194            "select \
195             e.process_id as process_id, \
196             c.process_name, \
197             c.pid, \
198             e.entity_id, \
199             json_extract(e.entity_json, '$.name') as entity_name, \
200             json_extract(e.entity_json, '$.body') as entity_body, \
201             case \
202               when p.process_scope_count is null then 1 \
203               else 0 \
204             end as missing_process_scope_link, \
205             case \
206               when json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null then 1 \
207               else 0 \
208             end as missing_task_scope_link \
209             from entities e \
210             left join connections c \
211               on c.process_id = e.process_id \
212             left join ( \
213               select \
214                 l.process_id, \
215                 l.entity_id, \
216                 count(*) as process_scope_count \
217               from entity_scope_links l \
218               join scopes s \
219                 on s.process_id = l.process_id \
220                and s.scope_id = l.scope_id \
221               where json_extract(s.scope_json, '$.body') = 'process' \
222               group by l.process_id, l.entity_id \
223             ) p \
224               on p.process_id = e.process_id \
225              and p.entity_id = e.entity_id \
226             left join ( \
227               select \
228                 l.process_id, \
229                 l.entity_id, \
230                 count(*) as task_scope_count \
231               from entity_scope_links l \
232               join scopes s \
233                 on s.process_id = l.process_id \
234                and s.scope_id = l.scope_id \
235               where json_extract(s.scope_json, '$.body') = 'task' \
236               group by l.process_id, l.entity_id \
237             ) t \
238               on t.process_id = e.process_id \
239              and t.entity_id = e.entity_id \
240             where p.process_scope_count is null \
241                or (json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null) \
242             order by c.process_name asc, entity_name asc, e.entity_id asc \
243             limit {limit}"
244        )),
245        "stale-blockers" => Ok(format!(
246            "select \
247             e.src_id as waiter_id, \
248             json_extract(src.entity_json, '$.name') as waiter_name, \
249             e.dst_id as blocked_on_id, \
250             json_extract(dst.entity_json, '$.name') as blocked_on_name, \
251             e.updated_at_ns \
252             from edges e \
253             left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
254             left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
255             where e.kind_json = '\"needs\"' \
256             order by e.updated_at_ns asc \
257             limit {limit}"
258        )),
259        _ => Err(format!(
260            "unknown query pack: {name}. expected one of: blockers, blocked-senders, blocked-receivers, stalled-sends, channel-pressure, channel-health, scope-membership, missing-scope-links, stale-blockers"
261        )),
262    }
263}