1use facet::Facet;
2use facet_value::Value;
3use moire_types::{ProcessId, ScopeEntityLink, SqlResponse};
4use rusqlite_facet::ConnectionFacetExt;
5
6use crate::db::Db;
7
8#[derive(Facet)]
9struct ScopeEntityLinkParams {
10 process_id: ProcessId,
11}
12
13pub fn fetch_scope_entity_links_blocking(
14 db: &Db,
15 process_id: ProcessId,
16) -> Result<Vec<ScopeEntityLink>, String> {
17 let conn = db.open()?;
18 conn.facet_query_ref::<ScopeEntityLink, _>(
19 "SELECT scope_id, entity_id FROM entity_scope_links WHERE process_id = :process_id",
20 &ScopeEntityLinkParams { process_id },
21 )
22 .map_err(|error| format!("query scope_entity_links: {error}"))
23}
24
25pub fn sql_query_blocking(db: &Db, sql: &str) -> Result<SqlResponse, String> {
26 let sql = sql.trim();
27 if sql.is_empty() {
28 return Err("empty SQL".to_string());
29 }
30
31 let conn = db.open()?;
32 let mut stmt = conn
33 .prepare(sql)
34 .map_err(|error| format!("prepare sql: {error}"))?;
35 if !stmt.readonly() {
36 return Err("only read-only statements are allowed".to_string());
37 }
38
39 let column_count = stmt.column_count();
40 let columns: Vec<String> = (0..column_count)
41 .map(|index| String::from(stmt.column_name(index).unwrap_or("?")))
42 .collect();
43
44 let mut rows = Vec::new();
45 let mut raw_rows = stmt.raw_query();
46
47 loop {
48 let Some(row) = raw_rows
49 .next()
50 .map_err(|error| format!("query row: {error}"))?
51 else {
52 break;
53 };
54
55 let mut row_values = Vec::with_capacity(column_count);
56 for index in 0..column_count {
57 let value_ref = row
58 .get_ref(index)
59 .map_err(|error| format!("read column {index}: {error}"))?;
60 row_values.push(moire_sqlite_facet::sqlite_value_ref_to_facet(value_ref));
61 }
62 let row_value: Value = row_values.into_iter().collect();
63 rows.push(row_value);
64 }
65
66 Ok(SqlResponse {
67 columns,
68 row_count: rows.len() as u32,
69 rows,
70 })
71}
72
73pub fn query_named_blocking(db: &Db, name: &str, limit: u32) -> Result<SqlResponse, String> {
74 let sql = named_query_sql(name, limit)?;
75 sql_query_blocking(db, &sql)
76}
77
78fn named_query_sql(name: &str, limit: u32) -> Result<String, String> {
79 match name {
80 "blockers" => Ok(format!(
81 "select \
82 e.src_id as waiter_id, \
83 json_extract(src.entity_json, '$.name') as waiter_name, \
84 e.dst_id as blocked_on_id, \
85 json_extract(dst.entity_json, '$.name') as blocked_on_name, \
86 e.kind_json \
87 from edges e \
88 left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
89 left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
90 where e.kind_json = '\"needs\"' \
91 order by e.updated_at_ns desc \
92 limit {limit}"
93 )),
94 "blocked-senders" => Ok(format!(
95 "select \
96 f.entity_id as send_future_id, \
97 json_extract(f.entity_json, '$.name') as send_name, \
98 e.dst_id as waiting_on_entity_id, \
99 json_extract(ch.entity_json, '$.name') as waiting_on_name, \
100 e.updated_at_ns \
101 from edges e \
102 join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
103 left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
104 where e.kind_json = '\"needs\"' \
105 and json_extract(f.entity_json, '$.body') = 'future' \
106 and json_extract(f.entity_json, '$.name') like '%.send' \
107 order by e.updated_at_ns desc \
108 limit {limit}"
109 )),
110 "blocked-receivers" => Ok(format!(
111 "select \
112 f.entity_id as recv_future_id, \
113 json_extract(f.entity_json, '$.name') as recv_name, \
114 e.dst_id as waiting_on_entity_id, \
115 json_extract(ch.entity_json, '$.name') as waiting_on_name, \
116 e.updated_at_ns \
117 from edges e \
118 join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
119 left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
120 where e.kind_json = '\"needs\"' \
121 and json_extract(f.entity_json, '$.body') = 'future' \
122 and json_extract(f.entity_json, '$.name') like '%.recv' \
123 order by e.updated_at_ns desc \
124 limit {limit}"
125 )),
126 "stalled-sends" => Ok(format!(
127 "select \
128 f.entity_id as send_future_id, \
129 json_extract(f.entity_json, '$.name') as send_name, \
130 e.dst_id as waiting_on_entity_id, \
131 json_extract(ch.entity_json, '$.name') as waiting_on_name \
132 from edges e \
133 join entities f on f.process_id = e.process_id and f.entity_id = e.src_id \
134 left join entities ch on ch.process_id = e.process_id and ch.entity_id = e.dst_id \
135 where e.kind_json = '\"needs\"' \
136 and json_extract(f.entity_json, '$.body') = 'future' \
137 and json_extract(f.entity_json, '$.name') like '%.send' \
138 order by e.updated_at_ns desc \
139 limit {limit}"
140 )),
141 "channel-pressure" => Ok(format!(
142 "select \
143 entity_id, \
144 json_extract(entity_json, '$.name') as name, \
145 coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as capacity, \
146 coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as occupancy, \
147 case \
148 when coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) > 0 \
149 then cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy')) as real) / \
150 cast(coalesce(json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity')) as real) \
151 else null \
152 end as utilization \
153 from entities \
154 where json_extract(entity_json, '$.body.channel_tx.details.mpsc') is not null \
155 or json_extract(entity_json, '$.body.channel_rx.details.mpsc') is not null \
156 order by utilization desc, name asc \
157 limit {limit}"
158 )),
159 "channel-health" => Ok(format!(
160 "select \
161 entity_id, \
162 json_extract(entity_json, '$.name') as name, \
163 coalesce( \
164 json_extract(entity_json, '$.body.channel_tx.lifecycle'), \
165 json_extract(entity_json, '$.body.channel_rx.lifecycle') \
166 ) as lifecycle, \
167 coalesce( \
168 json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.capacity'), \
169 json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.capacity') \
170 ) as capacity, \
171 coalesce( \
172 json_extract(entity_json, '$.body.channel_tx.details.mpsc.buffer.occupancy'), \
173 json_extract(entity_json, '$.body.channel_rx.details.mpsc.buffer.occupancy') \
174 ) as occupancy \
175 from entities \
176 where json_extract(entity_json, '$.body.channel_tx') is not null \
177 or json_extract(entity_json, '$.body.channel_rx') is not null \
178 order by name \
179 limit {limit}"
180 )),
181 "scope-membership" => Ok(format!(
182 "select \
183 l.scope_id, \
184 json_extract(s.scope_json, '$.name') as scope_name, \
185 l.entity_id, \
186 json_extract(e.entity_json, '$.name') as entity_name \
187 from entity_scope_links l \
188 left join scopes s on s.process_id = l.process_id and s.scope_id = l.scope_id \
189 left join entities e on e.process_id = l.process_id and e.entity_id = l.entity_id \
190 order by scope_name asc, entity_name asc \
191 limit {limit}"
192 )),
193 "missing-scope-links" => Ok(format!(
194 "select \
195 e.process_id as process_id, \
196 c.process_name, \
197 c.pid, \
198 e.entity_id, \
199 json_extract(e.entity_json, '$.name') as entity_name, \
200 json_extract(e.entity_json, '$.body') as entity_body, \
201 case \
202 when p.process_scope_count is null then 1 \
203 else 0 \
204 end as missing_process_scope_link, \
205 case \
206 when json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null then 1 \
207 else 0 \
208 end as missing_task_scope_link \
209 from entities e \
210 left join connections c \
211 on c.process_id = e.process_id \
212 left join ( \
213 select \
214 l.process_id, \
215 l.entity_id, \
216 count(*) as process_scope_count \
217 from entity_scope_links l \
218 join scopes s \
219 on s.process_id = l.process_id \
220 and s.scope_id = l.scope_id \
221 where json_extract(s.scope_json, '$.body') = 'process' \
222 group by l.process_id, l.entity_id \
223 ) p \
224 on p.process_id = e.process_id \
225 and p.entity_id = e.entity_id \
226 left join ( \
227 select \
228 l.process_id, \
229 l.entity_id, \
230 count(*) as task_scope_count \
231 from entity_scope_links l \
232 join scopes s \
233 on s.process_id = l.process_id \
234 and s.scope_id = l.scope_id \
235 where json_extract(s.scope_json, '$.body') = 'task' \
236 group by l.process_id, l.entity_id \
237 ) t \
238 on t.process_id = e.process_id \
239 and t.entity_id = e.entity_id \
240 where p.process_scope_count is null \
241 or (json_extract(e.entity_json, '$.body') = 'future' and t.task_scope_count is null) \
242 order by c.process_name asc, entity_name asc, e.entity_id asc \
243 limit {limit}"
244 )),
245 "stale-blockers" => Ok(format!(
246 "select \
247 e.src_id as waiter_id, \
248 json_extract(src.entity_json, '$.name') as waiter_name, \
249 e.dst_id as blocked_on_id, \
250 json_extract(dst.entity_json, '$.name') as blocked_on_name, \
251 e.updated_at_ns \
252 from edges e \
253 left join entities src on src.process_id = e.process_id and src.entity_id = e.src_id \
254 left join entities dst on dst.process_id = e.process_id and dst.entity_id = e.dst_id \
255 where e.kind_json = '\"needs\"' \
256 order by e.updated_at_ns asc \
257 limit {limit}"
258 )),
259 _ => Err(format!(
260 "unknown query pack: {name}. expected one of: blockers, blocked-senders, blocked-receivers, stalled-sends, channel-pressure, channel-health, scope-membership, missing-scope-links, stale-blockers"
261 )),
262 }
263}