Skip to main content

rustvello_postgres/
state_backend.rs

1//! PostgreSQL-backed [`StateBackend`] implementation.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use chrono::Utc;
7
8use rustvello_core::error::{RustvelloError, RustvelloResult, TaskError};
9use rustvello_core::state_backend::{
10    StateBackendCore, StateBackendQuery, StateBackendRunner, StoredRunnerContext,
11};
12use rustvello_proto::call::{CallDTO, SerializedArguments};
13use rustvello_proto::identifiers::{CallId, InvocationId, RunnerId, TaskId};
14use rustvello_proto::invocation::{InvocationDTO, InvocationHistory, WorkflowIdentity};
15use rustvello_proto::status::InvocationStatusRecord;
16
17use crate::db::{parse_status, pg_err, Database};
18
19/// PostgreSQL-backed state backend implementation.
20pub struct PostgresStateBackend {
21    db: Arc<Database>,
22}
23
24impl PostgresStateBackend {
25    pub fn new(db: Arc<Database>) -> Self {
26        Self { db }
27    }
28}
29
30#[async_trait]
31impl StateBackendCore for PostgresStateBackend {
32    async fn upsert_invocation(
33        &self,
34        invocation: &InvocationDTO,
35        call: &CallDTO,
36    ) -> RustvelloResult<()> {
37        let mut client = self.db.conn().await?;
38        let tx = client.transaction().await.map_err(pg_err)?;
39
40        let args_json = serde_json::to_string(&call.serialized_arguments.0).map_err(|e| {
41            RustvelloError::Serialization {
42                message: e.to_string(),
43            }
44        })?;
45
46        let (parent_inv_id, wf_id, wf_type, wf_depth) = match &invocation.workflow {
47            Some(wf) => (
48                invocation
49                    .parent_invocation_id
50                    .as_ref()
51                    .map(|id| id.as_str().to_string()),
52                Some(wf.workflow_id.as_str().to_string()),
53                Some(wf.workflow_type.to_string()),
54                Some(wf.depth as i32),
55            ),
56            None => (
57                invocation
58                    .parent_invocation_id
59                    .as_ref()
60                    .map(|id| id.as_str().to_string()),
61                None,
62                None,
63                None,
64            ),
65        };
66
67        tx
68            .execute(
69                "INSERT INTO invocations (invocation_id, task_id, call_id, status, created_at, updated_at,
70                    parent_invocation_id, workflow_id, workflow_type, workflow_depth)
71                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
72                 ON CONFLICT (invocation_id) DO UPDATE SET
73                    task_id = $2, call_id = $3, status = $4, updated_at = $6,
74                    parent_invocation_id = $7, workflow_id = $8, workflow_type = $9, workflow_depth = $10",
75                &[
76                    &invocation.invocation_id.as_str(),
77                    &invocation.task_id.to_string(),
78                    &invocation.call_id.to_string(),
79                    &invocation.status.to_string(),
80                    &invocation.created_at,
81                    &invocation.updated_at,
82                    &parent_inv_id as &(dyn tokio_postgres::types::ToSql + Sync),
83                    &wf_id as &(dyn tokio_postgres::types::ToSql + Sync),
84                    &wf_type as &(dyn tokio_postgres::types::ToSql + Sync),
85                    &wf_depth as &(dyn tokio_postgres::types::ToSql + Sync),
86                ],
87            )
88            .await
89            .map_err(pg_err)?;
90
91        tx.execute(
92            "INSERT INTO calls (call_id, task_id, serialized_arguments) VALUES ($1, $2, $3)
93                 ON CONFLICT (call_id) DO UPDATE SET task_id = $2, serialized_arguments = $3",
94            &[
95                &call.call_id.to_string(),
96                &call.task_id.to_string(),
97                &args_json,
98            ],
99        )
100        .await
101        .map_err(pg_err)?;
102
103        tx.commit().await.map_err(pg_err)?;
104
105        Ok(())
106    }
107
108    async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO> {
109        let client = self.db.conn().await?;
110
111        let row = client
112            .query_opt(
113                "SELECT task_id, call_id, status, created_at, updated_at,
114                        parent_invocation_id, workflow_id, workflow_type, workflow_depth
115                 FROM invocations WHERE invocation_id = $1",
116                &[&invocation_id.as_str()],
117            )
118            .await
119            .map_err(pg_err)?
120            .ok_or_else(|| RustvelloError::InvocationNotFound {
121                invocation_id: invocation_id.clone(),
122            })?;
123
124        let task_id_str: String = row.get(0);
125        let call_id_str: String = row.get(1);
126        let status_str: String = row.get(2);
127        let created_at: chrono::DateTime<Utc> = row.get(3);
128        let updated_at: chrono::DateTime<Utc> = row.get(4);
129        let parent_inv_id: Option<String> = row.get(5);
130        let wf_id: Option<String> = row.get(6);
131        let wf_type: Option<String> = row.get(7);
132        let wf_depth: Option<i32> = row.get(8);
133
134        let task_id: TaskId = task_id_str.parse().map_err(|e| {
135            RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
136        })?;
137
138        let call_id: CallId = call_id_str.parse().map_err(|e| {
139            RustvelloError::state_backend(format!("invalid call_id in database: {e}"))
140        })?;
141
142        let parent_invocation_id = parent_inv_id.map(InvocationId::from_string);
143
144        let workflow = match (wf_id, wf_type) {
145            (Some(wf_id_str), Some(wf_type_str)) => {
146                let wf_task_id: TaskId = wf_type_str.parse().map_err(|e| {
147                    RustvelloError::state_backend(format!(
148                        "invalid workflow task_id in database: {e}"
149                    ))
150                })?;
151                Some(WorkflowIdentity {
152                    workflow_id: InvocationId::from_string(wf_id_str),
153                    workflow_type: wf_task_id,
154                    parent_id: None,
155                    depth: u32::try_from(wf_depth.unwrap_or(0)).unwrap_or(0),
156                })
157            }
158            _ => None,
159        };
160
161        Ok(InvocationDTO {
162            invocation_id: invocation_id.clone(),
163            task_id,
164            call_id,
165            status: parse_status(&status_str)?,
166            created_at,
167            updated_at,
168            parent_invocation_id,
169            workflow,
170        })
171    }
172
173    async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO> {
174        let client = self.db.conn().await?;
175        let call_id_str = call_id.to_string();
176
177        let row = client
178            .query_opt(
179                "SELECT task_id, serialized_arguments FROM calls WHERE call_id = $1",
180                &[&call_id_str],
181            )
182            .await
183            .map_err(pg_err)?
184            .ok_or_else(|| {
185                RustvelloError::state_backend(format!("call not found: {}", call_id_str))
186            })?;
187
188        let task_id_str: String = row.get(0);
189        let args_json: String = row.get(1);
190
191        let task_id: TaskId = task_id_str.parse().map_err(|e| {
192            RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
193        })?;
194
195        let args_map: std::collections::BTreeMap<String, String> = serde_json::from_str(&args_json)
196            .map_err(|e| RustvelloError::Serialization {
197                message: e.to_string(),
198            })?;
199
200        let args = SerializedArguments(args_map);
201
202        Ok(CallDTO {
203            call_id: call_id.clone(),
204            task_id,
205            serialized_arguments: args,
206        })
207    }
208
209    async fn store_result(
210        &self,
211        invocation_id: &InvocationId,
212        result: &str,
213    ) -> RustvelloResult<()> {
214        let client = self.db.conn().await?;
215        client
216            .execute(
217                "INSERT INTO results (invocation_id, result) VALUES ($1, $2)
218                 ON CONFLICT (invocation_id) DO UPDATE SET result = $2",
219                &[&invocation_id.as_str(), &result],
220            )
221            .await
222            .map_err(pg_err)?;
223        Ok(())
224    }
225
226    async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>> {
227        let client = self.db.conn().await?;
228        let row = client
229            .query_opt(
230                "SELECT result FROM results WHERE invocation_id = $1",
231                &[&invocation_id.as_str()],
232            )
233            .await
234            .map_err(pg_err)?;
235        Ok(row.map(|r| r.get(0)))
236    }
237
238    async fn store_error(
239        &self,
240        invocation_id: &InvocationId,
241        error: &TaskError,
242    ) -> RustvelloResult<()> {
243        let client = self.db.conn().await?;
244        client
245            .execute(
246                "INSERT INTO errors (invocation_id, error_type, message, traceback) VALUES ($1, $2, $3, $4)
247                 ON CONFLICT (invocation_id) DO UPDATE SET error_type = $2, message = $3, traceback = $4",
248                &[
249                    &invocation_id.as_str(),
250                    &error.error_type,
251                    &error.message,
252                    &error.traceback as &(dyn tokio_postgres::types::ToSql + Sync),
253                ],
254            )
255            .await
256            .map_err(pg_err)?;
257        Ok(())
258    }
259
260    async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>> {
261        let client = self.db.conn().await?;
262        let row = client
263            .query_opt(
264                "SELECT error_type, message, traceback FROM errors WHERE invocation_id = $1",
265                &[&invocation_id.as_str()],
266            )
267            .await
268            .map_err(pg_err)?;
269
270        Ok(row.map(|r| TaskError {
271            error_type: r.get(0),
272            message: r.get(1),
273            traceback: r.get(2),
274        }))
275    }
276
277    async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()> {
278        let client = self.db.conn().await?;
279        let runner_id_str = history
280            .status_record
281            .runner_id
282            .as_ref()
283            .map(|r| r.as_str().to_string());
284
285        client
286            .execute(
287                "INSERT INTO history (invocation_id, status, runner_id, timestamp, message, history_timestamp)
288                 VALUES ($1, $2, $3, $4, $5, $6)",
289                &[
290                    &history.invocation_id.as_str(),
291                    &history.status_record.status.to_string(),
292                    &runner_id_str as &(dyn tokio_postgres::types::ToSql + Sync),
293                    &history.status_record.timestamp,
294                    &history.message as &(dyn tokio_postgres::types::ToSql + Sync),
295                    &history.history_timestamp as &(dyn tokio_postgres::types::ToSql + Sync),
296                ],
297            )
298            .await
299            .map_err(pg_err)?;
300        Ok(())
301    }
302
303    async fn get_history(
304        &self,
305        invocation_id: &InvocationId,
306    ) -> RustvelloResult<Vec<InvocationHistory>> {
307        let client = self.db.conn().await?;
308
309        let rows = client
310            .query(
311                "SELECT status, runner_id, timestamp, message, history_timestamp FROM history
312                 WHERE invocation_id = $1 ORDER BY id",
313                &[&invocation_id.as_str()],
314            )
315            .await
316            .map_err(pg_err)?;
317
318        let mut histories = Vec::with_capacity(rows.len());
319        for row in &rows {
320            let status_str: String = row.get(0);
321            let runner_id: Option<String> = row.get(1);
322            let timestamp: chrono::DateTime<Utc> = row.get(2);
323            let message: Option<String> = row.get(3);
324            let history_timestamp: Option<chrono::DateTime<Utc>> = row.get(4);
325
326            histories.push(InvocationHistory {
327                invocation_id: invocation_id.clone(),
328                status_record: InvocationStatusRecord {
329                    status: parse_status(&status_str)?,
330                    runner_id: runner_id.clone().map(RunnerId::from_string),
331                    timestamp,
332                },
333                message,
334                runner_id: runner_id.map(RunnerId::from_string),
335                registered_by_inv_id: None,
336                history_timestamp,
337            });
338        }
339        Ok(histories)
340    }
341
342    async fn purge(&self) -> RustvelloResult<()> {
343        let client = self.db.conn().await?;
344        client
345            .batch_execute(
346                "DELETE FROM invocations;
347                 DELETE FROM calls;
348                 DELETE FROM results;
349                 DELETE FROM errors;
350                 DELETE FROM history;
351                 DELETE FROM status_records;
352                 DELETE FROM waiting_for;
353                 DELETE FROM broker_queue;
354                 DELETE FROM workflow_runs;
355                 DELETE FROM workflow_data;
356                 DELETE FROM app_infos;
357                 DELETE FROM workflow_sub_invocations;
358                 DELETE FROM runner_contexts;",
359            )
360            .await
361            .map_err(pg_err)?;
362        Ok(())
363    }
364}
365
366#[async_trait]
367impl StateBackendQuery for PostgresStateBackend {
368    async fn get_workflow_invocations(
369        &self,
370        workflow_id: &InvocationId,
371    ) -> RustvelloResult<Vec<InvocationId>> {
372        let client = self.db.conn().await?;
373        let rows = client
374            .query(
375                "SELECT invocation_id FROM invocations WHERE workflow_id = $1",
376                &[&workflow_id.as_str()],
377            )
378            .await
379            .map_err(pg_err)?;
380        Ok(rows
381            .iter()
382            .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
383            .collect())
384    }
385
386    async fn get_child_invocations(
387        &self,
388        parent_invocation_id: &InvocationId,
389    ) -> RustvelloResult<Vec<InvocationId>> {
390        let client = self.db.conn().await?;
391        let rows = client
392            .query(
393                "SELECT invocation_id FROM invocations WHERE parent_invocation_id = $1",
394                &[&parent_invocation_id.as_str()],
395            )
396            .await
397            .map_err(pg_err)?;
398        Ok(rows
399            .iter()
400            .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
401            .collect())
402    }
403
404    async fn store_workflow_run(&self, workflow: &WorkflowIdentity) -> RustvelloResult<()> {
405        let client = self.db.conn().await?;
406        let parent_id = workflow
407            .parent_id
408            .as_ref()
409            .map(|id| id.as_str().to_string());
410        client
411            .execute(
412                "INSERT INTO workflow_runs (workflow_id, workflow_type, parent_workflow_id, depth)
413                 VALUES ($1, $2, $3, $4)
414                 ON CONFLICT (workflow_id) DO UPDATE SET
415                    workflow_type = $2, parent_workflow_id = $3, depth = $4",
416                &[
417                    &workflow.workflow_id.as_str(),
418                    &workflow.workflow_type.to_string(),
419                    &parent_id as &(dyn tokio_postgres::types::ToSql + Sync),
420                    &(workflow.depth as i32),
421                ],
422            )
423            .await
424            .map_err(pg_err)?;
425        Ok(())
426    }
427
428    async fn get_all_workflow_types(&self) -> RustvelloResult<Vec<TaskId>> {
429        let client = self.db.conn().await?;
430        let rows = client
431            .query("SELECT DISTINCT workflow_type FROM workflow_runs", &[])
432            .await
433            .map_err(pg_err)?;
434        rows.iter()
435            .map(|r| {
436                let s: String = r.get(0);
437                s.parse::<TaskId>().map_err(|e| {
438                    RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
439                })
440            })
441            .collect()
442    }
443
444    async fn get_workflow_runs(
445        &self,
446        workflow_type: &TaskId,
447    ) -> RustvelloResult<Vec<WorkflowIdentity>> {
448        let client = self.db.conn().await?;
449        let rows = client
450            .query(
451                "SELECT workflow_id, workflow_type, parent_workflow_id, depth
452                 FROM workflow_runs WHERE workflow_type = $1",
453                &[&workflow_type.to_string()],
454            )
455            .await
456            .map_err(pg_err)?;
457        rows.iter()
458            .map(|r| {
459                let wf_id: String = r.get(0);
460                let wf_type: String = r.get(1);
461                let parent_id: Option<String> = r.get(2);
462                let depth: i32 = r.get(3);
463                let task_id = wf_type.parse::<TaskId>().map_err(|e| {
464                    RustvelloError::state_backend(format!(
465                        "invalid workflow task_id in database: {e}"
466                    ))
467                })?;
468                Ok(WorkflowIdentity {
469                    workflow_id: InvocationId::from_string(wf_id),
470                    workflow_type: task_id,
471                    parent_id: parent_id.map(InvocationId::from_string),
472                    depth: u32::try_from(depth).unwrap_or(0),
473                })
474            })
475            .collect()
476    }
477
478    async fn set_workflow_data(
479        &self,
480        workflow_id: &InvocationId,
481        key: &str,
482        value: &str,
483    ) -> RustvelloResult<()> {
484        let client = self.db.conn().await?;
485        let key_s = key.to_string();
486        let value_s = value.to_string();
487        client
488            .execute(
489                "INSERT INTO workflow_data (workflow_id, data_key, data_value)
490                 VALUES ($1, $2, $3)
491                 ON CONFLICT (workflow_id, data_key) DO UPDATE SET data_value = $3",
492                &[&workflow_id.as_str(), &key_s, &value_s],
493            )
494            .await
495            .map_err(pg_err)?;
496        Ok(())
497    }
498
499    async fn get_workflow_data(
500        &self,
501        workflow_id: &InvocationId,
502        key: &str,
503    ) -> RustvelloResult<Option<String>> {
504        let client = self.db.conn().await?;
505        let key_s = key.to_string();
506        let row = client
507            .query_opt(
508                "SELECT data_value FROM workflow_data WHERE workflow_id = $1 AND data_key = $2",
509                &[&workflow_id.as_str(), &key_s],
510            )
511            .await
512            .map_err(pg_err)?;
513        Ok(row.map(|r| r.get(0)))
514    }
515
516    async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()> {
517        let client = self.db.conn().await?;
518        let app_id_s = app_id.to_string();
519        let info_s = info_json.to_string();
520        client
521            .execute(
522                "INSERT INTO app_infos (app_id, info_json) VALUES ($1, $2)
523                 ON CONFLICT (app_id) DO UPDATE SET info_json = $2",
524                &[&app_id_s, &info_s],
525            )
526            .await
527            .map_err(pg_err)?;
528        Ok(())
529    }
530
531    async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>> {
532        let client = self.db.conn().await?;
533        let app_id_s = app_id.to_string();
534        let row = client
535            .query_opt(
536                "SELECT info_json FROM app_infos WHERE app_id = $1",
537                &[&app_id_s],
538            )
539            .await
540            .map_err(pg_err)?;
541        Ok(row.map(|r| r.get(0)))
542    }
543
544    async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
545        let client = self.db.conn().await?;
546        let rows = client
547            .query("SELECT app_id, info_json FROM app_infos", &[])
548            .await
549            .map_err(pg_err)?;
550        Ok(rows.iter().map(|r| (r.get(0), r.get(1))).collect())
551    }
552
553    async fn store_workflow_sub_invocation(
554        &self,
555        workflow_id: &InvocationId,
556        sub_inv_id: &InvocationId,
557    ) -> RustvelloResult<()> {
558        let client = self.db.conn().await?;
559        client
560            .execute(
561                "INSERT INTO workflow_sub_invocations (workflow_id, sub_invocation_id)
562                 VALUES ($1, $2) ON CONFLICT DO NOTHING",
563                &[&workflow_id.as_str(), &sub_inv_id.as_str()],
564            )
565            .await
566            .map_err(pg_err)?;
567        Ok(())
568    }
569
570    async fn get_workflow_sub_invocations(
571        &self,
572        workflow_id: &InvocationId,
573    ) -> RustvelloResult<Vec<InvocationId>> {
574        let client = self.db.conn().await?;
575        let rows = client
576            .query(
577                "SELECT sub_invocation_id FROM workflow_sub_invocations WHERE workflow_id = $1",
578                &[&workflow_id.as_str()],
579            )
580            .await
581            .map_err(pg_err)?;
582        Ok(rows
583            .iter()
584            .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
585            .collect())
586    }
587
588    async fn get_all_workflow_runs(&self) -> RustvelloResult<Vec<WorkflowIdentity>> {
589        let client = self.db.conn().await?;
590        let rows = client
591            .query(
592                "SELECT workflow_id, workflow_type, parent_workflow_id, depth FROM workflow_runs",
593                &[],
594            )
595            .await
596            .map_err(pg_err)?;
597        rows.iter()
598            .map(|r| {
599                let task_id = r.get::<_, String>(1).parse::<TaskId>().map_err(|e| {
600                    RustvelloError::state_backend(format!(
601                        "invalid workflow task_id in database: {e}"
602                    ))
603                })?;
604                Ok(WorkflowIdentity {
605                    workflow_id: InvocationId::from_string(r.get::<_, String>(0)),
606                    workflow_type: task_id,
607                    parent_id: r.get::<_, Option<String>>(2).map(InvocationId::from_string),
608                    depth: u32::try_from(r.get::<_, i32>(3)).unwrap_or(0),
609                })
610            })
611            .collect()
612    }
613}
614
615#[async_trait]
616impl StateBackendRunner for PostgresStateBackend {
617    async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()> {
618        let client = self.db.conn().await?;
619        client
620            .execute(
621                "INSERT INTO runner_contexts
622                 (runner_id, runner_cls, pid, hostname, thread_id, started_at,
623                  parent_runner_id, parent_runner_cls)
624                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
625                 ON CONFLICT (runner_id) DO UPDATE SET
626                    runner_cls = $2, pid = $3, hostname = $4, thread_id = $5,
627                    started_at = $6, parent_runner_id = $7, parent_runner_cls = $8",
628                &[
629                    &context.runner_id,
630                    &context.runner_cls,
631                    &i32::try_from(context.pid).unwrap_or(0),
632                    &context.hostname,
633                    &(context.thread_id as i64),
634                    &context.started_at,
635                    &context.parent_runner_id as &(dyn tokio_postgres::types::ToSql + Sync),
636                    &context.parent_runner_cls as &(dyn tokio_postgres::types::ToSql + Sync),
637                ],
638            )
639            .await
640            .map_err(pg_err)?;
641        Ok(())
642    }
643
644    async fn get_runner_context(
645        &self,
646        runner_id: &str,
647    ) -> RustvelloResult<Option<StoredRunnerContext>> {
648        let client = self.db.conn().await?;
649        let runner_id_s = runner_id.to_string();
650        let row = client
651            .query_opt(
652                "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
653                        parent_runner_id, parent_runner_cls
654                 FROM runner_contexts WHERE runner_id = $1",
655                &[&runner_id_s],
656            )
657            .await
658            .map_err(pg_err)?;
659        Ok(row.map(|r| parse_pg_runner_row(&r)))
660    }
661
662    async fn get_runner_contexts_by_parent(
663        &self,
664        parent_runner_id: &str,
665    ) -> RustvelloResult<Vec<StoredRunnerContext>> {
666        let client = self.db.conn().await?;
667        let parent_id_s = parent_runner_id.to_string();
668        let rows = client
669            .query(
670                "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
671                        parent_runner_id, parent_runner_cls
672                 FROM runner_contexts WHERE parent_runner_id = $1",
673                &[&parent_id_s],
674            )
675            .await
676            .map_err(pg_err)?;
677        Ok(rows.iter().map(parse_pg_runner_row).collect())
678    }
679
680    async fn get_invocation_ids_by_runner(
681        &self,
682        runner_id: &str,
683        limit: usize,
684        offset: usize,
685    ) -> RustvelloResult<Vec<InvocationId>> {
686        let client = self.db.conn().await?;
687        let runner_id_s = runner_id.to_string();
688        let rows = if limit > 0 {
689            client
690                .query(
691                    "SELECT DISTINCT invocation_id FROM history
692                     WHERE runner_id = $1 LIMIT $2 OFFSET $3",
693                    &[&runner_id_s, &(limit as i64), &(offset as i64)],
694                )
695                .await
696                .map_err(pg_err)?
697        } else {
698            client
699                .query(
700                    "SELECT DISTINCT invocation_id FROM history
701                     WHERE runner_id = $1 OFFSET $2",
702                    &[&runner_id_s, &(offset as i64)],
703                )
704                .await
705                .map_err(pg_err)?
706        };
707        Ok(rows
708            .iter()
709            .map(|r| InvocationId::from_string(r.get::<_, String>(0)))
710            .collect())
711    }
712
713    async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize> {
714        let client = self.db.conn().await?;
715        let runner_id_s = runner_id.to_string();
716        let row = client
717            .query_one(
718                "SELECT COUNT(DISTINCT invocation_id) FROM history WHERE runner_id = $1",
719                &[&runner_id_s],
720            )
721            .await
722            .map_err(pg_err)?;
723        let count: i64 = row.get(0);
724        Ok(count as usize)
725    }
726
727    async fn get_history_in_timerange(
728        &self,
729        start: chrono::DateTime<chrono::Utc>,
730        end: chrono::DateTime<chrono::Utc>,
731        limit: usize,
732        offset: usize,
733    ) -> RustvelloResult<Vec<InvocationHistory>> {
734        let client = self.db.conn().await?;
735        let rows = if limit > 0 {
736            client
737                .query(
738                    "SELECT invocation_id, status, runner_id, timestamp, message, history_timestamp
739                     FROM history
740                     WHERE COALESCE(history_timestamp, timestamp) >= $1
741                       AND COALESCE(history_timestamp, timestamp) <= $2
742                     ORDER BY COALESCE(history_timestamp, timestamp) ASC
743                     LIMIT $3 OFFSET $4",
744                    &[&start, &end, &(limit as i64), &(offset as i64)],
745                )
746                .await
747                .map_err(pg_err)?
748        } else {
749            client
750                .query(
751                    "SELECT invocation_id, status, runner_id, timestamp, message, history_timestamp
752                     FROM history
753                     WHERE COALESCE(history_timestamp, timestamp) >= $1
754                       AND COALESCE(history_timestamp, timestamp) <= $2
755                     ORDER BY COALESCE(history_timestamp, timestamp) ASC
756                     OFFSET $3",
757                    &[&start, &end, &(offset as i64)],
758                )
759                .await
760                .map_err(pg_err)?
761        };
762        rows.iter()
763            .map(|r| {
764                let inv_id: String = r.get(0);
765                let status_str: String = r.get(1);
766                let runner_id: Option<String> = r.get(2);
767                let timestamp: chrono::DateTime<Utc> = r.get(3);
768                let message: Option<String> = r.get(4);
769                let history_timestamp: Option<chrono::DateTime<Utc>> = r.get(5);
770                Ok(InvocationHistory {
771                    invocation_id: InvocationId::from_string(inv_id),
772                    status_record: InvocationStatusRecord {
773                        status: parse_status(&status_str)?,
774                        runner_id: runner_id.clone().map(RunnerId::from_string),
775                        timestamp,
776                    },
777                    message,
778                    runner_id: runner_id.map(RunnerId::from_string),
779                    registered_by_inv_id: None,
780                    history_timestamp,
781                })
782            })
783            .collect()
784    }
785
786    async fn get_matching_runner_contexts(
787        &self,
788        partial_id: &str,
789    ) -> RustvelloResult<Vec<StoredRunnerContext>> {
790        let client = self.db.conn().await?;
791        let pattern = format!("%{partial_id}%");
792        let rows = client
793            .query(
794                "SELECT runner_id, runner_cls, pid, hostname, thread_id, started_at,
795                        parent_runner_id, parent_runner_cls
796                 FROM runner_contexts WHERE runner_id LIKE $1",
797                &[&pattern],
798            )
799            .await
800            .map_err(pg_err)?;
801        Ok(rows.iter().map(parse_pg_runner_row).collect())
802    }
803}
804
805fn parse_pg_runner_row(row: &tokio_postgres::Row) -> StoredRunnerContext {
806    StoredRunnerContext {
807        runner_id: row.get(0),
808        runner_cls: row.get(1),
809        pid: u32::try_from(row.get::<_, i32>(2)).unwrap_or(0),
810        hostname: row.get(3),
811        thread_id: u64::try_from(row.get::<_, i64>(4)).unwrap_or(0),
812        started_at: row.get(5),
813        parent_runner_id: row.get(6),
814        parent_runner_cls: row.get(7),
815    }
816}