Skip to main content

rustvello_mongo/
state_backend.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use mongodb::bson::doc;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult, TaskError};
7use rustvello_core::state_backend::{
8    StateBackendCore, StateBackendQuery, StateBackendRunner, StoredRunnerContext,
9};
10use rustvello_proto::call::CallDTO;
11use rustvello_proto::identifiers::{CallId, InvocationId, TaskId};
12use rustvello_proto::invocation::{InvocationDTO, InvocationHistory, WorkflowIdentity};
13
14use crate::connection::{mongo_err, MongoPool};
15
16const INV_COL: &str = "state_invocations";
17const CALL_COL: &str = "state_calls";
18const RESULT_COL: &str = "state_results";
19const ERROR_COL: &str = "state_errors";
20const HISTORY_COL: &str = "state_history";
21const WF_RUNS_COL: &str = "state_workflow_runs";
22const WF_DATA_COL: &str = "state_workflow_data";
23const APP_INFOS_COL: &str = "state_app_infos";
24const WF_SUB_COL: &str = "state_workflow_sub_invocations";
25const RUNNER_CTX_COL: &str = "state_runner_contexts";
26
27/// MongoDB-backed state backend.
28///
29/// Uses separate collections for invocations, calls, results, errors,
30/// and history entries. Documents are stored as JSON strings within
31/// BSON documents keyed by their IDs.
32#[non_exhaustive]
33pub struct MongoStateBackend {
34    pool: Arc<MongoPool>,
35}
36
37impl MongoStateBackend {
38    pub fn new(pool: Arc<MongoPool>) -> Self {
39        Self { pool }
40    }
41}
42
43#[async_trait]
44impl StateBackendCore for MongoStateBackend {
45    async fn upsert_invocation(
46        &self,
47        invocation: &InvocationDTO,
48        call: &CallDTO,
49    ) -> RustvelloResult<()> {
50        let db = self.pool.db().await?;
51
52        let inv_json =
53            serde_json::to_string(invocation).map_err(|e| RustvelloError::Serialization {
54                message: e.to_string(),
55            })?;
56        let call_json = serde_json::to_string(call).map_err(|e| RustvelloError::Serialization {
57            message: e.to_string(),
58        })?;
59
60        // Upsert invocation
61        let inv_col = db.collection::<mongodb::bson::Document>(INV_COL);
62        let filter = doc! { "_id": invocation.invocation_id.to_string() };
63        let update = doc! {
64            "$set": {
65                "data": &inv_json,
66                "workflow_id": invocation.workflow.as_ref().map(|w| w.workflow_id.to_string()),
67                "parent_invocation_id": invocation.parent_invocation_id.as_ref().map(ToString::to_string),
68            }
69        };
70        inv_col
71            .update_one(filter, update)
72            .upsert(true)
73            .await
74            .map_err(mongo_err)?;
75
76        // Upsert call
77        let call_col = db.collection::<mongodb::bson::Document>(CALL_COL);
78        let filter = doc! { "_id": call.call_id.to_string() };
79        let update = doc! { "$set": { "data": &call_json } };
80        call_col
81            .update_one(filter, update)
82            .upsert(true)
83            .await
84            .map_err(mongo_err)?;
85
86        Ok(())
87    }
88
89    async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO> {
90        let db = self.pool.db().await?;
91        let col = db.collection::<mongodb::bson::Document>(INV_COL);
92        let filter = doc! { "_id": invocation_id.to_string() };
93        let result = col.find_one(filter).await.map_err(mongo_err)?;
94        match result {
95            Some(d) => {
96                let s = d
97                    .get_str("data")
98                    .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
99                serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
100                    message: e.to_string(),
101                })
102            }
103            None => Err(RustvelloError::InvocationNotFound {
104                invocation_id: invocation_id.clone(),
105            }),
106        }
107    }
108
109    async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO> {
110        let db = self.pool.db().await?;
111        let col = db.collection::<mongodb::bson::Document>(CALL_COL);
112        let filter = doc! { "_id": call_id.to_string() };
113        let result = col.find_one(filter).await.map_err(mongo_err)?;
114        match result {
115            Some(d) => {
116                let s = d
117                    .get_str("data")
118                    .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
119                serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
120                    message: e.to_string(),
121                })
122            }
123            None => Err(RustvelloError::state_backend(format!(
124                "call not found: {}",
125                call_id
126            ))),
127        }
128    }
129
130    async fn store_result(
131        &self,
132        invocation_id: &InvocationId,
133        result: &str,
134    ) -> RustvelloResult<()> {
135        let db = self.pool.db().await?;
136        let col = db.collection::<mongodb::bson::Document>(RESULT_COL);
137        let filter = doc! { "_id": invocation_id.to_string() };
138        let update = doc! { "$set": { "result": result } };
139        col.update_one(filter, update)
140            .upsert(true)
141            .await
142            .map_err(mongo_err)?;
143        Ok(())
144    }
145
146    async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>> {
147        let db = self.pool.db().await?;
148        let col = db.collection::<mongodb::bson::Document>(RESULT_COL);
149        let filter = doc! { "_id": invocation_id.to_string() };
150        let result = col.find_one(filter).await.map_err(mongo_err)?;
151        Ok(result.and_then(|d| d.get_str("result").ok().map(ToString::to_string)))
152    }
153
154    async fn store_error(
155        &self,
156        invocation_id: &InvocationId,
157        error: &TaskError,
158    ) -> RustvelloResult<()> {
159        let db = self.pool.db().await?;
160        let col = db.collection::<mongodb::bson::Document>(ERROR_COL);
161        let json = serde_json::to_string(error).map_err(|e| RustvelloError::Serialization {
162            message: e.to_string(),
163        })?;
164        let filter = doc! { "_id": invocation_id.to_string() };
165        let update = doc! { "$set": { "error": &json } };
166        col.update_one(filter, update)
167            .upsert(true)
168            .await
169            .map_err(mongo_err)?;
170        Ok(())
171    }
172
173    async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>> {
174        let db = self.pool.db().await?;
175        let col = db.collection::<mongodb::bson::Document>(ERROR_COL);
176        let filter = doc! { "_id": invocation_id.to_string() };
177        let result = col.find_one(filter).await.map_err(mongo_err)?;
178        match result {
179            Some(d) => match d.get_str("error") {
180                Ok(s) => {
181                    let err: TaskError =
182                        serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
183                            message: e.to_string(),
184                        })?;
185                    Ok(Some(err))
186                }
187                Err(_) => Ok(None),
188            },
189            None => Ok(None),
190        }
191    }
192
193    async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()> {
194        let db = self.pool.db().await?;
195        let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
196        let json = serde_json::to_string(history).map_err(|e| RustvelloError::Serialization {
197            message: e.to_string(),
198        })?;
199        let runner_id = history
200            .runner_id
201            .as_ref()
202            .or(history.status_record.runner_id.as_ref())
203            .map(|r| r.as_str().to_string());
204        let ts = history
205            .history_timestamp
206            .unwrap_or(history.status_record.timestamp);
207        let doc = doc! {
208            "invocation_id": history.invocation_id.to_string(),
209            "runner_id": runner_id,
210            "timestamp": mongodb::bson::DateTime::from_millis(ts.timestamp_millis()),
211            "data": &json,
212        };
213        col.insert_one(doc).await.map_err(mongo_err)?;
214        Ok(())
215    }
216
217    async fn get_history(
218        &self,
219        invocation_id: &InvocationId,
220    ) -> RustvelloResult<Vec<InvocationHistory>> {
221        let db = self.pool.db().await?;
222        let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
223        let filter = doc! { "invocation_id": invocation_id.to_string() };
224        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
225
226        let mut result = Vec::new();
227        use futures_util::StreamExt;
228        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
229            let d = doc_result.map_err(mongo_err)?;
230            if let Ok(s) = d.get_str("data") {
231                let h: InvocationHistory =
232                    serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
233                        message: e.to_string(),
234                    })?;
235                result.push(h);
236            }
237        }
238        Ok(result)
239    }
240
241    async fn purge(&self) -> RustvelloResult<()> {
242        let db = self.pool.db().await?;
243        for col_name in [
244            INV_COL,
245            CALL_COL,
246            RESULT_COL,
247            ERROR_COL,
248            HISTORY_COL,
249            WF_RUNS_COL,
250            WF_DATA_COL,
251            APP_INFOS_COL,
252            WF_SUB_COL,
253            RUNNER_CTX_COL,
254        ] {
255            let col = db.collection::<mongodb::bson::Document>(col_name);
256            col.delete_many(doc! {}).await.map_err(mongo_err)?;
257        }
258        Ok(())
259    }
260}
261
262#[async_trait]
263impl StateBackendQuery for MongoStateBackend {
264    async fn get_workflow_invocations(
265        &self,
266        workflow_id: &InvocationId,
267    ) -> RustvelloResult<Vec<InvocationId>> {
268        let db = self.pool.db().await?;
269        let col = db.collection::<mongodb::bson::Document>(INV_COL);
270        let filter = doc! { "workflow_id": workflow_id.to_string() };
271        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
272
273        let mut result = Vec::new();
274        use futures_util::StreamExt;
275        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
276            let d = doc_result.map_err(mongo_err)?;
277            if let Ok(id) = d.get_str("_id") {
278                result.push(InvocationId::from_string(id.to_string()));
279            }
280        }
281        Ok(result)
282    }
283
284    async fn get_child_invocations(
285        &self,
286        parent_invocation_id: &InvocationId,
287    ) -> RustvelloResult<Vec<InvocationId>> {
288        let db = self.pool.db().await?;
289        let col = db.collection::<mongodb::bson::Document>(INV_COL);
290        let filter = doc! { "parent_invocation_id": parent_invocation_id.to_string() };
291        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
292
293        let mut result = Vec::new();
294        use futures_util::StreamExt;
295        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
296            let d = doc_result.map_err(mongo_err)?;
297            if let Ok(id) = d.get_str("_id") {
298                result.push(InvocationId::from_string(id.to_string()));
299            }
300        }
301        Ok(result)
302    }
303
304    async fn store_workflow_run(&self, workflow: &WorkflowIdentity) -> RustvelloResult<()> {
305        let db = self.pool.db().await?;
306        let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
307        let filter = doc! { "_id": workflow.workflow_id.as_str() };
308        let update = doc! { "$set": {
309            "workflow_type": workflow.workflow_type.to_string(),
310            "parent_workflow_id": workflow.parent_id.as_ref().map(|id| id.as_str().to_string()),
311            "depth": workflow.depth as i32,
312        }};
313        col.update_one(filter, update)
314            .upsert(true)
315            .await
316            .map_err(mongo_err)?;
317        Ok(())
318    }
319
320    async fn get_all_workflow_types(&self) -> RustvelloResult<Vec<TaskId>> {
321        let db = self.pool.db().await?;
322        let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
323        let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
324        let mut types = std::collections::HashSet::new();
325        use futures_util::StreamExt;
326        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
327            let d = doc_result.map_err(mongo_err)?;
328            if let Ok(t) = d.get_str("workflow_type") {
329                types.insert(t.to_string());
330            }
331        }
332        types
333            .into_iter()
334            .map(|s| {
335                s.parse::<TaskId>().map_err(|e| {
336                    RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
337                })
338            })
339            .collect()
340    }
341
342    async fn get_workflow_runs(
343        &self,
344        workflow_type: &TaskId,
345    ) -> RustvelloResult<Vec<WorkflowIdentity>> {
346        let db = self.pool.db().await?;
347        let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
348        let filter = doc! { "workflow_type": workflow_type.to_string() };
349        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
350        let mut result = Vec::new();
351        use futures_util::StreamExt;
352        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
353            let d = doc_result.map_err(mongo_err)?;
354            let wf_id = d
355                .get_str("_id")
356                .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
357            let wf_type = d
358                .get_str("workflow_type")
359                .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
360            let parent_id = d
361                .get_str("parent_workflow_id")
362                .ok()
363                .map(std::string::ToString::to_string);
364            let depth = d.get_i32("depth").unwrap_or(0);
365            let task_id = wf_type.parse::<TaskId>().map_err(|e| {
366                RustvelloError::state_backend(format!("invalid workflow task_id in database: {e}"))
367            })?;
368            result.push(WorkflowIdentity {
369                workflow_id: InvocationId::from_string(wf_id.to_string()),
370                workflow_type: task_id,
371                parent_id: parent_id.map(InvocationId::from_string),
372                depth: u32::try_from(depth).unwrap_or(0),
373            });
374        }
375        Ok(result)
376    }
377
378    async fn set_workflow_data(
379        &self,
380        workflow_id: &InvocationId,
381        key: &str,
382        value: &str,
383    ) -> RustvelloResult<()> {
384        let db = self.pool.db().await?;
385        let col = db.collection::<mongodb::bson::Document>(WF_DATA_COL);
386        let doc_id = format!("{}:{}", workflow_id.as_str(), key);
387        let filter = doc! { "_id": &doc_id };
388        let update =
389            doc! { "$set": { "workflow_id": workflow_id.as_str(), "key": key, "value": value } };
390        col.update_one(filter, update)
391            .upsert(true)
392            .await
393            .map_err(mongo_err)?;
394        Ok(())
395    }
396
397    async fn get_workflow_data(
398        &self,
399        workflow_id: &InvocationId,
400        key: &str,
401    ) -> RustvelloResult<Option<String>> {
402        let db = self.pool.db().await?;
403        let col = db.collection::<mongodb::bson::Document>(WF_DATA_COL);
404        let doc_id = format!("{}:{}", workflow_id.as_str(), key);
405        let filter = doc! { "_id": &doc_id };
406        let result = col.find_one(filter).await.map_err(mongo_err)?;
407        Ok(result.and_then(|d| d.get_str("value").ok().map(ToString::to_string)))
408    }
409
410    async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()> {
411        let db = self.pool.db().await?;
412        let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
413        let filter = doc! { "_id": app_id };
414        let update = doc! { "$set": { "info_json": info_json } };
415        col.update_one(filter, update)
416            .upsert(true)
417            .await
418            .map_err(mongo_err)?;
419        Ok(())
420    }
421
422    async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>> {
423        let db = self.pool.db().await?;
424        let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
425        let filter = doc! { "_id": app_id };
426        let result = col.find_one(filter).await.map_err(mongo_err)?;
427        Ok(result.and_then(|d| d.get_str("info_json").ok().map(ToString::to_string)))
428    }
429
430    async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
431        let db = self.pool.db().await?;
432        let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
433        let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
434        let mut result = Vec::new();
435        use futures_util::StreamExt;
436        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
437            let d = doc_result.map_err(mongo_err)?;
438            if let (Ok(app_id), Ok(info)) = (d.get_str("_id"), d.get_str("info_json")) {
439                result.push((app_id.to_string(), info.to_string()));
440            }
441        }
442        Ok(result)
443    }
444
445    async fn store_workflow_sub_invocation(
446        &self,
447        workflow_id: &InvocationId,
448        sub_inv_id: &InvocationId,
449    ) -> RustvelloResult<()> {
450        let db = self.pool.db().await?;
451        let col = db.collection::<mongodb::bson::Document>(WF_SUB_COL);
452        let doc_id = format!("{}:{}", workflow_id.as_str(), sub_inv_id.as_str());
453        let filter = doc! { "_id": &doc_id };
454        let update = doc! { "$set": {
455            "workflow_id": workflow_id.as_str(),
456            "sub_invocation_id": sub_inv_id.as_str(),
457        }};
458        col.update_one(filter, update)
459            .upsert(true)
460            .await
461            .map_err(mongo_err)?;
462        Ok(())
463    }
464
465    async fn get_workflow_sub_invocations(
466        &self,
467        workflow_id: &InvocationId,
468    ) -> RustvelloResult<Vec<InvocationId>> {
469        let db = self.pool.db().await?;
470        let col = db.collection::<mongodb::bson::Document>(WF_SUB_COL);
471        let filter = doc! { "workflow_id": workflow_id.as_str() };
472        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
473        let mut result = Vec::new();
474        use futures_util::StreamExt;
475        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
476            let d = doc_result.map_err(mongo_err)?;
477            if let Ok(id) = d.get_str("sub_invocation_id") {
478                result.push(InvocationId::from_string(id.to_string()));
479            }
480        }
481        Ok(result)
482    }
483
484    async fn get_all_workflow_runs(&self) -> RustvelloResult<Vec<WorkflowIdentity>> {
485        let db = self.pool.db().await?;
486        let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
487        let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
488        let mut result = Vec::new();
489        use futures_util::StreamExt;
490        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
491            let d = doc_result.map_err(mongo_err)?;
492            let wf_id = d.get_str("_id").unwrap_or_default().to_string();
493            let wf_type_str = d.get_str("workflow_type").unwrap_or_default();
494            let task_id = wf_type_str.parse::<TaskId>().map_err(|e| {
495                RustvelloError::state_backend(format!("invalid workflow task_id in database: {e}"))
496            })?;
497            let parent_id = d
498                .get_str("parent_workflow_id")
499                .ok()
500                .filter(|s| !s.is_empty())
501                .map(|s| InvocationId::from_string(s.to_string()));
502            let depth = u32::try_from(d.get_i32("depth").unwrap_or(0)).unwrap_or(0);
503            result.push(WorkflowIdentity {
504                workflow_id: InvocationId::from_string(wf_id),
505                workflow_type: task_id,
506                parent_id,
507                depth,
508            });
509        }
510        Ok(result)
511    }
512}
513
514#[async_trait]
515impl StateBackendRunner for MongoStateBackend {
516    async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()> {
517        let db = self.pool.db().await?;
518        let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
519        let json = serde_json::to_string(context).map_err(|e| RustvelloError::Serialization {
520            message: e.to_string(),
521        })?;
522        let filter = doc! { "_id": &context.runner_id };
523        let update = doc! { "$set": {
524            "data": &json,
525            "parent_runner_id": &context.parent_runner_id,
526        }};
527        col.update_one(filter, update)
528            .upsert(true)
529            .await
530            .map_err(mongo_err)?;
531        Ok(())
532    }
533
534    async fn get_runner_context(
535        &self,
536        runner_id: &str,
537    ) -> RustvelloResult<Option<StoredRunnerContext>> {
538        let db = self.pool.db().await?;
539        let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
540        let filter = doc! { "_id": runner_id };
541        let result = col.find_one(filter).await.map_err(mongo_err)?;
542        match result {
543            Some(d) => {
544                let s = d
545                    .get_str("data")
546                    .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
547                let ctx: StoredRunnerContext =
548                    serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
549                        message: e.to_string(),
550                    })?;
551                Ok(Some(ctx))
552            }
553            None => Ok(None),
554        }
555    }
556
557    async fn get_runner_contexts_by_parent(
558        &self,
559        parent_runner_id: &str,
560    ) -> RustvelloResult<Vec<StoredRunnerContext>> {
561        let db = self.pool.db().await?;
562        let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
563        let filter = doc! { "parent_runner_id": parent_runner_id };
564        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
565        let mut result = Vec::new();
566        use futures_util::StreamExt;
567        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
568            let d = doc_result.map_err(mongo_err)?;
569            if let Ok(s) = d.get_str("data") {
570                let ctx: StoredRunnerContext =
571                    serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
572                        message: e.to_string(),
573                    })?;
574                result.push(ctx);
575            }
576        }
577        Ok(result)
578    }
579
580    async fn get_invocation_ids_by_runner(
581        &self,
582        runner_id: &str,
583        limit: usize,
584        offset: usize,
585    ) -> RustvelloResult<Vec<InvocationId>> {
586        let db = self.pool.db().await?;
587        let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
588        let filter = doc! { "runner_id": runner_id };
589        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
590        let mut seen = std::collections::HashSet::new();
591        let mut result = Vec::new();
592        use futures_util::StreamExt;
593        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
594            let d = doc_result.map_err(mongo_err)?;
595            if let Ok(inv_id) = d.get_str("invocation_id") {
596                if seen.insert(inv_id.to_string()) {
597                    result.push(InvocationId::from_string(inv_id.to_string()));
598                }
599            }
600        }
601        let iter = result.into_iter().skip(offset);
602        let ids: Vec<InvocationId> = if limit > 0 {
603            iter.take(limit).collect()
604        } else {
605            iter.collect()
606        };
607        Ok(ids)
608    }
609
610    async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize> {
611        let db = self.pool.db().await?;
612        let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
613        let filter = doc! { "runner_id": runner_id };
614        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
615        let mut seen = std::collections::HashSet::new();
616        use futures_util::StreamExt;
617        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
618            let d = doc_result.map_err(mongo_err)?;
619            if let Ok(inv_id) = d.get_str("invocation_id") {
620                seen.insert(inv_id.to_string());
621            }
622        }
623        Ok(seen.len())
624    }
625
626    async fn get_history_in_timerange(
627        &self,
628        start: chrono::DateTime<chrono::Utc>,
629        end: chrono::DateTime<chrono::Utc>,
630        limit: usize,
631        offset: usize,
632    ) -> RustvelloResult<Vec<InvocationHistory>> {
633        let db = self.pool.db().await?;
634        let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
635        let start_bson = mongodb::bson::DateTime::from_millis(start.timestamp_millis());
636        let end_bson = mongodb::bson::DateTime::from_millis(end.timestamp_millis());
637        let filter = doc! {
638            "timestamp": { "$gte": start_bson, "$lte": end_bson }
639        };
640        let opts = mongodb::options::FindOptions::builder()
641            .sort(doc! { "timestamp": 1 })
642            .skip(Some(offset as u64))
643            .limit(if limit > 0 { Some(limit as i64) } else { None })
644            .build();
645        let mut cursor = col
646            .find(filter)
647            .with_options(opts)
648            .await
649            .map_err(mongo_err)?;
650        let mut result = Vec::new();
651        use futures_util::StreamExt;
652        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
653            let d = doc_result.map_err(mongo_err)?;
654            if let Ok(s) = d.get_str("data") {
655                let h: InvocationHistory =
656                    serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
657                        message: e.to_string(),
658                    })?;
659                result.push(h);
660            }
661        }
662        Ok(result)
663    }
664
665    async fn get_matching_runner_contexts(
666        &self,
667        partial_id: &str,
668    ) -> RustvelloResult<Vec<StoredRunnerContext>> {
669        let db = self.pool.db().await?;
670        let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
671        let filter = doc! { "_id": { "$regex": partial_id } };
672        let mut cursor = col.find(filter).await.map_err(mongo_err)?;
673        let mut result = Vec::new();
674        use futures_util::StreamExt;
675        while let Some(doc_result) = StreamExt::next(&mut cursor).await {
676            let d = doc_result.map_err(mongo_err)?;
677            if let Ok(s) = d.get_str("data") {
678                let ctx: StoredRunnerContext =
679                    serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
680                        message: e.to_string(),
681                    })?;
682                result.push(ctx);
683            }
684        }
685        Ok(result)
686    }
687}