1use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7
8use rustvello_core::error::{RustvelloError, RustvelloResult, TaskError};
9use rustvello_core::state_backend::{
10 StateBackendCore, StateBackendQuery, StateBackendRunner, StoredRunnerContext,
11};
12use rustvello_proto::call::{CallDTO, SerializedArguments};
13use rustvello_proto::identifiers::{CallId, InvocationId, RunnerId, TaskId};
14use rustvello_proto::invocation::{InvocationDTO, InvocationHistory, WorkflowIdentity};
15use rustvello_proto::status::InvocationStatusRecord;
16
17use crate::db::{parse_status, pg_err, Database};
18
19pub struct PostgresStateBackend {
21 db: Arc<Database>,
22}
23
24impl PostgresStateBackend {
25 pub fn new(db: Arc<Database>) -> Self {
26 Self { db }
27 }
28}
29
30#[async_trait]
31impl StateBackendCore for PostgresStateBackend {
32 async fn upsert_invocation(
33 &self,
34 invocation: &InvocationDTO,
35 call: &CallDTO,
36 ) -> RustvelloResult<()> {
37 let mut client = self.db.conn().await?;
38 let tx = client.transaction().await.map_err(pg_err)?;
39
40 let args_json = serde_json::to_string(&call.serialized_arguments.0).map_err(|e| {
41 RustvelloError::Serialization {
42 message: e.to_string(),
43 }
44 })?;
45
46 let (parent_inv_id, wf_id, wf_type, wf_depth) = match &invocation.workflow {
47 Some(wf) => (
48 invocation
49 .parent_invocation_id
50 .as_ref()
51 .map(|id| id.as_str().to_string()),
52 Some(wf.workflow_id.as_str().to_string()),
53 Some(wf.workflow_type.to_string()),
54 Some(wf.depth as i32),
55 ),
56 None => (
57 invocation
58 .parent_invocation_id
59 .as_ref()
60 .map(|id| id.as_str().to_string()),
61 None,
62 None,
63 None,
64 ),
65 };
66
67 tx
68 .execute(
69 "INSERT INTO invocations (invocation_id, task_id, call_id, status, created_at, updated_at,
70 parent_invocation_id, workflow_id, workflow_type, workflow_depth)
71 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
72 ON CONFLICT (invocation_id) DO UPDATE SET
73 task_id = $2, call_id = $3, status = $4, updated_at = $6,
74 parent_invocation_id = $7, workflow_id = $8, workflow_type = $9, workflow_depth = $10",
75 &[
76 &invocation.invocation_id.as_str(),
77 &invocation.task_id.to_string(),
78 &invocation.call_id.to_string(),
79 &invocation.status.to_string(),
80 &invocation.created_at,
81 &invocation.updated_at,
82 &parent_inv_id as &(dyn tokio_postgres::types::ToSql + Sync),
83 &wf_id as &(dyn tokio_postgres::types::ToSql + Sync),
84 &wf_type as &(dyn tokio_postgres::types::ToSql + Sync),
85 &wf_depth as &(dyn tokio_postgres::types::ToSql + Sync),
86 ],
87 )
88 .await
89 .map_err(pg_err)?;
90
91 tx.execute(
92 "INSERT INTO calls (call_id, task_id, serialized_arguments) VALUES ($1, $2, $3)
93 ON CONFLICT (call_id) DO UPDATE SET task_id = $2, serialized_arguments = $3",
94 &[
95 &call.call_id.to_string(),
96 &call.task_id.to_string(),
97 &args_json,
98 ],
99 )
100 .await
101 .map_err(pg_err)?;
102
103 tx.commit().await.map_err(pg_err)?;
104
105 Ok(())
106 }
107
108 async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO> {
109 let client = self.db.conn().await?;
110
111 let row = client
112 .query_opt(
113 "SELECT task_id, call_id, status, created_at, updated_at,
114 parent_invocation_id, workflow_id, workflow_type, workflow_depth
115 FROM invocations WHERE invocation_id = $1",
116 &[&invocation_id.as_str()],
117 )
118 .await
119 .map_err(pg_err)?
120 .ok_or_else(|| RustvelloError::InvocationNotFound {
121 invocation_id: invocation_id.clone(),
122 })?;
123
124 let task_id_str: String = row.get(0);
125 let call_id_str: String = row.get(1);
126 let status_str: String = row.get(2);
127 let created_at: chrono::DateTime<Utc> = row.get(3);
128 let updated_at: chrono::DateTime<Utc> = row.get(4);
129 let parent_inv_id: Option<String> = row.get(5);
130 let wf_id: Option<String> = row.get(6);
131 let wf_type: Option<String> = row.get(7);
132 let wf_depth: Option<i32> = row.get(8);
133
134 let task_id: TaskId = task_id_str.parse().map_err(|e| {
135 RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
136 })?;
137
138 let call_id: CallId = call_id_str.parse().map_err(|e| {
139 RustvelloError::state_backend(format!("invalid call_id in database: {e}"))
140 })?;
141
142 let parent_invocation_id = parent_inv_id.map(InvocationId::from_string);
143
144 let workflow = match (wf_id, wf_type) {
145 (Some(wf_id_str), Some(wf_type_str)) => {
146 let wf_task_id: TaskId = wf_type_str.parse().map_err(|e| {
147 RustvelloError::state_backend(format!(
148 "invalid workflow task_id in database: {e}"
149 ))
150 })?;
151 Some(WorkflowIdentity {
152 workflow_id: InvocationId::from_string(wf_id_str),
153 workflow_type: wf_task_id,
154 parent_id: None,
155 depth: u32::try_from(wf_depth.unwrap_or(0)).unwrap_or(0),
156 })
157 }
158 _ => None,
159 };
160
161 Ok(InvocationDTO {
162 invocation_id: invocation_id.clone(),
163 task_id,
164 call_id,
165 status: parse_status(&status_str)?,
166 created_at,
167 updated_at,
168 parent_invocation_id,
169 workflow,
170 })
171 }
172
173 async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO> {
174 let client = self.db.conn().await?;
175 let call_id_str = call_id.to_string();
176
177 let row = client
178 .query_opt(
179 "SELECT task_id, serialized_arguments FROM calls WHERE call_id = $1",
180 &[&call_id_str],
181 )
182 .await
183 .map_err(pg_err)?
184 .ok_or_else(|| {
185 RustvelloError::state_backend(format!("call not found: {}", call_id_str))
186 })?;
187
188 let task_id_str: String = row.get(0);
189 let args_json: String = row.get(1);
190
191 let task_id: TaskId = task_id_str.parse().map_err(|e| {
192 RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
193 })?;
194
195 let args_map: std::collections::BTreeMap<String, String> = serde_json::from_str(&args_json)
196 .map_err(|e| RustvelloError::Serialization {
197 message: e.to_string(),
198 })?;
199
200 let args = SerializedArguments(args_map);
201
202 Ok(CallDTO {
203 call_id: call_id.clone(),
204 task_id,
205 serialized_arguments: args,
206 })
207 }
208
209 async fn store_result(
210 &self,
211 invocation_id: &InvocationId,
212 result: &str,
213 ) -> RustvelloResult<()> {
214 let client = self.db.conn().await?;
215 client
216 .execute(
217 "INSERT INTO results (invocation_id, result) VALUES ($1, $2)
218 ON CONFLICT (invocation_id) DO UPDATE SET result = $2",
219 &[&invocation_id.as_str(), &result],
220 )
221 .await
222 .map_err(pg_err)?;
223 Ok(())
224 }
225
226 async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>> {
227 let client = self.db.conn().await?;
228 let row = client
229 .query_opt(
230 "SELECT result FROM results WHERE invocation_id = $1",
231 &[&invocation_id.as_str()],
232 )
233 .await
234 .map_err(pg_err)?;
235 Ok(row.map(|r| r.get(0)))
236 }
237
238 async fn store_error(
239 &self,
240 invocation_id: &InvocationId,
241 error: &TaskError,
242 ) -> RustvelloResult<()> {
243 let client = self.db.conn().await?;
244 client
245 .execute(
246 "INSERT INTO errors (invocation_id, error_type, message, traceback) VALUES ($1, $2, $3, $4)
247 ON CONFLICT (invocation_id) DO UPDATE SET error_type = $2, message = $3, traceback = $4",
248 &[
249 &invocation_id.as_str(),
250 &error.error_type,
251 &error.message,
252 &error.traceback as &(dyn tokio_postgres::types::ToSql + Sync),
253 ],
254 )
255 .await
256 .map_err(pg_err)?;
257 Ok(())
258 }
259
260 async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>> {
261 let client = self.db.conn().await?;
262 let row = client
263 .query_opt(
264 "SELECT error_type, message, traceback FROM errors WHERE invocation_id = $1",
265 &[&invocation_id.as_str()],
266 )
267 .await
268 .map_err(pg_err)?;
269
270 Ok(row.map(|r| TaskError {
271 error_type: r.get(0),
272 message: r.get(1),
273 traceback: r.get(2),
274 }))
275 }
276
277 async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()> {
278 let client = self.db.conn().await?;
279 let runner_id_str = history
280 .status_record
281 .runner_id
282 .as_ref()
283 .map(|r| r.as_str().to_string());
284
285 client
286 .execute(
287 "INSERT INTO history (invocation_id, status, runner_id, timestamp, message, history_timestamp)
288 VALUES ($1, $2, $3, $4, $5, $6)",
289 &[
290 &history.invocation_id.as_str(),
291 &history.status_record.status.to_string(),
292 &runner_id_str as &(dyn tokio_postgres::types::ToSql + Sync),
293 &history.status_record.timestamp,
294 &history.message as &(dyn tokio_postgres::types::ToSql + Sync),
295 &history.history_timestamp as &(dyn tokio_postgres::types::ToSql + Sync),
296 ],
297 )
298 .await
299 .map_err(pg_err)?;
300 Ok(())
301 }
302
303 async fn get_history(
304 &self,
305 invocation_id: &InvocationId,
306 ) -> RustvelloResult<Vec<InvocationHistory>> {
307 let client = self.db.conn().await?;
308
309 let rows = client
310 .query(
311 "SELECT status, runner_id, timestamp, message, history_timestamp FROM history
312 WHERE invocation_id = $1 ORDER BY id",
313 &[&invocation_id.as_str()],
314 )
315 .await
316 .map_err(pg_err)?;
317
318 let mut histories = Vec::with_capacity(rows.len());
319 for row in &rows {
320 let status_str: String = row.get(0);
321 let runner_id: Option<String> = row.get(1);
322 let timestamp: chrono::DateTime<Utc> = row.get(2);
323 let message: Option<String> = row.get(3);
324 let history_timestamp: Option<chrono::DateTime<Utc>> = row.get(4);
325
326 histories.push(InvocationHistory {
327 invocation_id: invocation_id.clone(),
328 status_record: InvocationStatusRecord {
329 status: parse_status(&status_str)?,
330 runner_id: runner_id.clone().map(RunnerId::from_string),
331 timestamp,
332 },
333 message,
334 runner_id: runner_id.map(RunnerId::from_string),
335 registered_by_inv_id: None,
336 history_timestamp,
337 });
338 }
339 Ok(histories)
340 }
341
342 async fn purge(&self) -> RustvelloResult<()> {
343 let client = self.db.conn().await?;
344 client
345 .batch_execute(
346 "DELETE FROM invocations;
347 DELETE FROM calls;
348 DELETE FROM results;
349 DELETE FROM errors;
350 DELETE FROM history;
351 DELETE FROM status_records;
352 DELETE FROM waiting_for;
353 DELETE FROM broker_queue;
354 DELETE FROM workflow_runs;
355 DELETE FROM workflow_data;
356 DELETE FROM app_infos;
357 DELETE FROM workflow_sub_invocations;
358 DELETE FROM runner_contexts;",
359 )
360 .await
361 .map_err(pg_err)?;
362 Ok(())
363 }
364}
365
366#[async_trait]
367impl StateBackendQuery for PostgresStateBackend {
368 async fn get_workflow_invocations(
369 &self,
370 workflow_id: &InvocationId,
371 ) -> RustvelloResult<Vec<InvocationId>> {
372 let client = self.db.conn().await?;
373 let rows = client
374 .query(
375 "SELECT invocation_id FROM invocations WHERE workflow_id = $1",
376 &[&workflow_id.as_str()],
377 )
378 .await
379 .map_err(pg_err)?;
380 Ok(rows
381 .iter()
382 .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
383 .collect())
384 }
385
386 async fn get_child_invocations(
387 &self,
388 parent_invocation_id: &InvocationId,
389 ) -> RustvelloResult<Vec<InvocationId>> {
390 let client = self.db.conn().await?;
391 let rows = client
392 .query(
393 "SELECT invocation_id FROM invocations WHERE parent_invocation_id = $1",
394 &[&parent_invocation_id.as_str()],
395 )
396 .await
397 .map_err(pg_err)?;
398 Ok(rows
399 .iter()
400 .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
401 .collect())
402 }
403
404 async fn store_workflow_run(&self, workflow: &WorkflowIdentity) -> RustvelloResult<()> {
405 let client = self.db.conn().await?;
406 let parent_id = workflow
407 .parent_id
408 .as_ref()
409 .map(|id| id.as_str().to_string());
410 client
411 .execute(
412 "INSERT INTO workflow_runs (workflow_id, workflow_type, parent_workflow_id, depth)
413 VALUES ($1, $2, $3, $4)
414 ON CONFLICT (workflow_id) DO UPDATE SET
415 workflow_type = $2, parent_workflow_id = $3, depth = $4",
416 &[
417 &workflow.workflow_id.as_str(),
418 &workflow.workflow_type.to_string(),
419 &parent_id as &(dyn tokio_postgres::types::ToSql + Sync),
420 &(workflow.depth as i32),
421 ],
422 )
423 .await
424 .map_err(pg_err)?;
425 Ok(())
426 }
427
428 async fn get_all_workflow_types(&self) -> RustvelloResult<Vec<TaskId>> {
429 let client = self.db.conn().await?;
430 let rows = client
431 .query("SELECT DISTINCT workflow_type FROM workflow_runs", &[])
432 .await
433 .map_err(pg_err)?;
434 rows.iter()
435 .map(|r| {
436 let s: String = r.get(0);
437 s.parse::<TaskId>().map_err(|e| {
438 RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
439 })
440 })
441 .collect()
442 }
443
444 async fn get_workflow_runs(
445 &self,
446 workflow_type: &TaskId,
447 ) -> RustvelloResult<Vec<WorkflowIdentity>> {
448 let client = self.db.conn().await?;
449 let rows = client
450 .query(
451 "SELECT workflow_id, workflow_type, parent_workflow_id, depth
452 FROM workflow_runs WHERE workflow_type = $1",
453 &[&workflow_type.to_string()],
454 )
455 .await
456 .map_err(pg_err)?;
457 rows.iter()
458 .map(|r| {
459 let wf_id: String = r.get(0);
460 let wf_type: String = r.get(1);
461 let parent_id: Option<String> = r.get(2);
462 let depth: i32 = r.get(3);
463 let task_id = wf_type.parse::<TaskId>().map_err(|e| {
464 RustvelloError::state_backend(format!(
465 "invalid workflow task_id in database: {e}"
466 ))
467 })?;
468 Ok(WorkflowIdentity {
469 workflow_id: InvocationId::from_string(wf_id),
470 workflow_type: task_id,
471 parent_id: parent_id.map(InvocationId::from_string),
472 depth: u32::try_from(depth).unwrap_or(0),
473 })
474 })
475 .collect()
476 }
477
478 async fn set_workflow_data(
479 &self,
480 workflow_id: &InvocationId,
481 key: &str,
482 value: &str,
483 ) -> RustvelloResult<()> {
484 let client = self.db.conn().await?;
485 let key_s = key.to_string();
486 let value_s = value.to_string();
487 client
488 .execute(
489 "INSERT INTO workflow_data (workflow_id, data_key, data_value)
490 VALUES ($1, $2, $3)
491 ON CONFLICT (workflow_id, data_key) DO UPDATE SET data_value = $3",
492 &[&workflow_id.as_str(), &key_s, &value_s],
493 )
494 .await
495 .map_err(pg_err)?;
496 Ok(())
497 }
498
499 async fn get_workflow_data(
500 &self,
501 workflow_id: &InvocationId,
502 key: &str,
503 ) -> RustvelloResult<Option<String>> {
504 let client = self.db.conn().await?;
505 let key_s = key.to_string();
506 let row = client
507 .query_opt(
508 "SELECT data_value FROM workflow_data WHERE workflow_id = $1 AND data_key = $2",
509 &[&workflow_id.as_str(), &key_s],
510 )
511 .await
512 .map_err(pg_err)?;
513 Ok(row.map(|r| r.get(0)))
514 }
515
516 async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()> {
517 let client = self.db.conn().await?;
518 let app_id_s = app_id.to_string();
519 let info_s = info_json.to_string();
520 client
521 .execute(
522 "INSERT INTO app_infos (app_id, info_json) VALUES ($1, $2)
523 ON CONFLICT (app_id) DO UPDATE SET info_json = $2",
524 &[&app_id_s, &info_s],
525 )
526 .await
527 .map_err(pg_err)?;
528 Ok(())
529 }
530
531 async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>> {
532 let client = self.db.conn().await?;
533 let app_id_s = app_id.to_string();
534 let row = client
535 .query_opt(
536 "SELECT info_json FROM app_infos WHERE app_id = $1",
537 &[&app_id_s],
538 )
539 .await
540 .map_err(pg_err)?;
541 Ok(row.map(|r| r.get(0)))
542 }
543
544 async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
545 let client = self.db.conn().await?;
546 let rows = client
547 .query("SELECT app_id, info_json FROM app_infos", &[])
548 .await
549 .map_err(pg_err)?;
550 Ok(rows.iter().map(|r| (r.get(0), r.get(1))).collect())
551 }
552
553 async fn store_workflow_sub_invocation(
554 &self,
555 workflow_id: &InvocationId,
556 sub_inv_id: &InvocationId,
557 ) -> RustvelloResult<()> {
558 let client = self.db.conn().await?;
559 client
560 .execute(
561 "INSERT INTO workflow_sub_invocations (workflow_id, sub_invocation_id)
562 VALUES ($1, $2) ON CONFLICT DO NOTHING",
563 &[&workflow_id.as_str(), &sub_inv_id.as_str()],
564 )
565 .await
566 .map_err(pg_err)?;
567 Ok(())
568 }
569
570 async fn get_workflow_sub_invocations(
571 &self,
572 workflow_id: &InvocationId,
573 ) -> RustvelloResult<Vec<InvocationId>> {
574 let client = self.db.conn().await?;
575 let rows = client
576 .query(
577 "SELECT sub_invocation_id FROM workflow_sub_invocations WHERE workflow_id = $1",
578 &[&workflow_id.as_str()],
579 )
580 .await
581 .map_err(pg_err)?;
582 Ok(rows
583 .iter()
584 .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
585 .collect())
586 }
587
588 async fn get_all_workflow_runs(&self) -> RustvelloResult<Vec<WorkflowIdentity>> {
589 let client = self.db.conn().await?;
590 let rows = client
591 .query(
592 "SELECT workflow_id, workflow_type, parent_workflow_id, depth FROM workflow_runs",
593 &[],
594 )
595 .await
596 .map_err(pg_err)?;
597 rows.iter()
598 .map(|r| {
599 let task_id = r.get::<_, String>(1).parse::<TaskId>().map_err(|e| {
600 RustvelloError::state_backend(format!(
601 "invalid workflow task_id in database: {e}"
602 ))
603 })?;
604 Ok(WorkflowIdentity {
605 workflow_id: InvocationId::from_string(r.get::<_, String>(0)),
606 workflow_type: task_id,
607 parent_id: r.get::<_, Option<String>>(2).map(InvocationId::from_string),
608 depth: u32::try_from(r.get::<_, i32>(3)).unwrap_or(0),
609 })
610 })
611 .collect()
612 }
613}
614
615#[async_trait]
616impl StateBackendRunner for PostgresStateBackend {
617 async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()> {
618 let client = self.db.conn().await?;
619 client
620 .execute(
621 "INSERT INTO runner_contexts
622 (runner_id, runner_cls, pid, hostname, thread_id, started_at,
623 parent_runner_id, parent_runner_cls)
624 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
625 ON CONFLICT (runner_id) DO UPDATE SET
626 runner_cls = $2, pid = $3, hostname = $4, thread_id = $5,
627 started_at = $6, parent_runner_id = $7, parent_runner_cls = $8",
628 &[
629 &context.runner_id,
630 &context.runner_cls,
631 &i32::try_from(context.pid).unwrap_or(0),
632 &context.hostname,
633 &(context.thread_id as i64),
634 &context.started_at,
635 &context.parent_runner_id as &(dyn tokio_postgres::types::ToSql + Sync),
636 &context.parent_runner_cls as &(dyn tokio_postgres::types::ToSql + Sync),
637 ],
638 )
639 .await
640 .map_err(pg_err)?;
641 Ok(())
642 }
643
644 async fn get_runner_context(
645 &self,
646 runner_id: &str,
647 ) -> RustvelloResult<Option<StoredRunnerContext>> {
648 let client = self.db.conn().await?;
649 let runner_id_s = runner_id.to_string();
650 let row = client
651 .query_opt(
652 "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
653 parent_runner_id, parent_runner_cls
654 FROM runner_contexts WHERE runner_id = $1",
655 &[&runner_id_s],
656 )
657 .await
658 .map_err(pg_err)?;
659 Ok(row.map(|r| parse_pg_runner_row(&r)))
660 }
661
662 async fn get_runner_contexts_by_parent(
663 &self,
664 parent_runner_id: &str,
665 ) -> RustvelloResult<Vec<StoredRunnerContext>> {
666 let client = self.db.conn().await?;
667 let parent_id_s = parent_runner_id.to_string();
668 let rows = client
669 .query(
670 "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
671 parent_runner_id, parent_runner_cls
672 FROM runner_contexts WHERE parent_runner_id = $1",
673 &[&parent_id_s],
674 )
675 .await
676 .map_err(pg_err)?;
677 Ok(rows.iter().map(parse_pg_runner_row).collect())
678 }
679
680 async fn get_invocation_ids_by_runner(
681 &self,
682 runner_id: &str,
683 limit: usize,
684 offset: usize,
685 ) -> RustvelloResult<Vec<InvocationId>> {
686 let client = self.db.conn().await?;
687 let runner_id_s = runner_id.to_string();
688 let rows = if limit > 0 {
689 client
690 .query(
691 "SELECT DISTINCT invocation_id FROM history
692 WHERE runner_id = $1 LIMIT $2 OFFSET $3",
693 &[&runner_id_s, &(limit as i64), &(offset as i64)],
694 )
695 .await
696 .map_err(pg_err)?
697 } else {
698 client
699 .query(
700 "SELECT DISTINCT invocation_id FROM history
701 WHERE runner_id = $1 OFFSET $2",
702 &[&runner_id_s, &(offset as i64)],
703 )
704 .await
705 .map_err(pg_err)?
706 };
707 Ok(rows
708 .iter()
709 .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
710 .collect())
711 }
712
713 async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize> {
714 let client = self.db.conn().await?;
715 let runner_id_s = runner_id.to_string();
716 let row = client
717 .query_one(
718 "SELECT COUNT(DISTINCT invocation_id) FROM history WHERE runner_id = $1",
719 &[&runner_id_s],
720 )
721 .await
722 .map_err(pg_err)?;
723 let count: i64 = row.get(0);
724 Ok(count as usize)
725 }
726
727 async fn get_history_in_timerange(
728 &self,
729 start: chrono::DateTime<chrono::Utc>,
730 end: chrono::DateTime<chrono::Utc>,
731 limit: usize,
732 offset: usize,
733 ) -> RustvelloResult<Vec<InvocationHistory>> {
734 let client = self.db.conn().await?;
735 let rows = if limit > 0 {
736 client
737 .query(
738 "SELECT invocation_id, status, runner_id, timestamp, message, history_timestamp
739 FROM history
740 WHERE COALESCE(history_timestamp, timestamp) >= $1
741 AND COALESCE(history_timestamp, timestamp) <= $2
742 ORDER BY COALESCE(history_timestamp, timestamp) ASC
743 LIMIT $3 OFFSET $4",
744 &[&start, &end, &(limit as i64), &(offset as i64)],
745 )
746 .await
747 .map_err(pg_err)?
748 } else {
749 client
750 .query(
751 "SELECT invocation_id, status, runner_id, timestamp, message, history_timestamp
752 FROM history
753 WHERE COALESCE(history_timestamp, timestamp) >= $1
754 AND COALESCE(history_timestamp, timestamp) <= $2
755 ORDER BY COALESCE(history_timestamp, timestamp) ASC
756 OFFSET $3",
757 &[&start, &end, &(offset as i64)],
758 )
759 .await
760 .map_err(pg_err)?
761 };
762 rows.iter()
763 .map(|r| {
764 let inv_id: String = r.get(0);
765 let status_str: String = r.get(1);
766 let runner_id: Option<String> = r.get(2);
767 let timestamp: chrono::DateTime<Utc> = r.get(3);
768 let message: Option<String> = r.get(4);
769 let history_timestamp: Option<chrono::DateTime<Utc>> = r.get(5);
770 Ok(InvocationHistory {
771 invocation_id: InvocationId::from_string(inv_id),
772 status_record: InvocationStatusRecord {
773 status: parse_status(&status_str)?,
774 runner_id: runner_id.clone().map(RunnerId::from_string),
775 timestamp,
776 },
777 message,
778 runner_id: runner_id.map(RunnerId::from_string),
779 registered_by_inv_id: None,
780 history_timestamp,
781 })
782 })
783 .collect()
784 }
785
786 async fn get_matching_runner_contexts(
787 &self,
788 partial_id: &str,
789 ) -> RustvelloResult<Vec<StoredRunnerContext>> {
790 let client = self.db.conn().await?;
791 let pattern = format!("%{partial_id}%");
792 let rows = client
793 .query(
794 "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
795 parent_runner_id, parent_runner_cls
796 FROM runner_contexts WHERE runner_id LIKE $1",
797 &[&pattern],
798 )
799 .await
800 .map_err(pg_err)?;
801 Ok(rows.iter().map(parse_pg_runner_row).collect())
802 }
803}
804
805fn parse_pg_runner_row(row: &tokio_postgres::Row) -> StoredRunnerContext {
806 StoredRunnerContext {
807 runner_id: row.get(0),
808 runner_cls: row.get(1),
809 pid: u32::try_from(row.get::<_, i32>(2)).unwrap_or(0),
810 hostname: row.get(3),
811 thread_id: u64::try_from(row.get::<_, i64>(4)).unwrap_or(0),
812 started_at: row.get(5),
813 parent_runner_id: row.get(6),
814 parent_runner_cls: row.get(7),
815 }
816}