1use std::sync::Arc;
2
3use async_trait::async_trait;
4use mongodb::bson::doc;
5
6use rustvello_core::error::{RustvelloError, RustvelloResult, TaskError};
7use rustvello_core::state_backend::{
8 StateBackendCore, StateBackendQuery, StateBackendRunner, StoredRunnerContext,
9};
10use rustvello_proto::call::CallDTO;
11use rustvello_proto::identifiers::{CallId, InvocationId, TaskId};
12use rustvello_proto::invocation::{InvocationDTO, InvocationHistory, WorkflowIdentity};
13
14use crate::connection::{mongo_err, MongoPool};
15
16const INV_COL: &str = "state_invocations";
17const CALL_COL: &str = "state_calls";
18const RESULT_COL: &str = "state_results";
19const ERROR_COL: &str = "state_errors";
20const HISTORY_COL: &str = "state_history";
21const WF_RUNS_COL: &str = "state_workflow_runs";
22const WF_DATA_COL: &str = "state_workflow_data";
23const APP_INFOS_COL: &str = "state_app_infos";
24const WF_SUB_COL: &str = "state_workflow_sub_invocations";
25const RUNNER_CTX_COL: &str = "state_runner_contexts";
26
27#[non_exhaustive]
33pub struct MongoStateBackend {
34 pool: Arc<MongoPool>,
35}
36
37impl MongoStateBackend {
38 pub fn new(pool: Arc<MongoPool>) -> Self {
39 Self { pool }
40 }
41}
42
43#[async_trait]
44impl StateBackendCore for MongoStateBackend {
45 async fn upsert_invocation(
46 &self,
47 invocation: &InvocationDTO,
48 call: &CallDTO,
49 ) -> RustvelloResult<()> {
50 let db = self.pool.db().await?;
51
52 let inv_json =
53 serde_json::to_string(invocation).map_err(|e| RustvelloError::Serialization {
54 message: e.to_string(),
55 })?;
56 let call_json = serde_json::to_string(call).map_err(|e| RustvelloError::Serialization {
57 message: e.to_string(),
58 })?;
59
60 let inv_col = db.collection::<mongodb::bson::Document>(INV_COL);
62 let filter = doc! { "_id": invocation.invocation_id.to_string() };
63 let update = doc! {
64 "$set": {
65 "data": &inv_json,
66 "workflow_id": invocation.workflow.as_ref().map(|w| w.workflow_id.to_string()),
67 "parent_invocation_id": invocation.parent_invocation_id.as_ref().map(ToString::to_string),
68 }
69 };
70 inv_col
71 .update_one(filter, update)
72 .upsert(true)
73 .await
74 .map_err(mongo_err)?;
75
76 let call_col = db.collection::<mongodb::bson::Document>(CALL_COL);
78 let filter = doc! { "_id": call.call_id.to_string() };
79 let update = doc! { "$set": { "data": &call_json } };
80 call_col
81 .update_one(filter, update)
82 .upsert(true)
83 .await
84 .map_err(mongo_err)?;
85
86 Ok(())
87 }
88
89 async fn get_invocation(&self, invocation_id: &InvocationId) -> RustvelloResult<InvocationDTO> {
90 let db = self.pool.db().await?;
91 let col = db.collection::<mongodb::bson::Document>(INV_COL);
92 let filter = doc! { "_id": invocation_id.to_string() };
93 let result = col.find_one(filter).await.map_err(mongo_err)?;
94 match result {
95 Some(d) => {
96 let s = d
97 .get_str("data")
98 .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
99 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
100 message: e.to_string(),
101 })
102 }
103 None => Err(RustvelloError::InvocationNotFound {
104 invocation_id: invocation_id.clone(),
105 }),
106 }
107 }
108
109 async fn get_call(&self, call_id: &CallId) -> RustvelloResult<CallDTO> {
110 let db = self.pool.db().await?;
111 let col = db.collection::<mongodb::bson::Document>(CALL_COL);
112 let filter = doc! { "_id": call_id.to_string() };
113 let result = col.find_one(filter).await.map_err(mongo_err)?;
114 match result {
115 Some(d) => {
116 let s = d
117 .get_str("data")
118 .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
119 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
120 message: e.to_string(),
121 })
122 }
123 None => Err(RustvelloError::state_backend(format!(
124 "call not found: {}",
125 call_id
126 ))),
127 }
128 }
129
130 async fn store_result(
131 &self,
132 invocation_id: &InvocationId,
133 result: &str,
134 ) -> RustvelloResult<()> {
135 let db = self.pool.db().await?;
136 let col = db.collection::<mongodb::bson::Document>(RESULT_COL);
137 let filter = doc! { "_id": invocation_id.to_string() };
138 let update = doc! { "$set": { "result": result } };
139 col.update_one(filter, update)
140 .upsert(true)
141 .await
142 .map_err(mongo_err)?;
143 Ok(())
144 }
145
146 async fn get_result(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<String>> {
147 let db = self.pool.db().await?;
148 let col = db.collection::<mongodb::bson::Document>(RESULT_COL);
149 let filter = doc! { "_id": invocation_id.to_string() };
150 let result = col.find_one(filter).await.map_err(mongo_err)?;
151 Ok(result.and_then(|d| d.get_str("result").ok().map(ToString::to_string)))
152 }
153
154 async fn store_error(
155 &self,
156 invocation_id: &InvocationId,
157 error: &TaskError,
158 ) -> RustvelloResult<()> {
159 let db = self.pool.db().await?;
160 let col = db.collection::<mongodb::bson::Document>(ERROR_COL);
161 let json = serde_json::to_string(error).map_err(|e| RustvelloError::Serialization {
162 message: e.to_string(),
163 })?;
164 let filter = doc! { "_id": invocation_id.to_string() };
165 let update = doc! { "$set": { "error": &json } };
166 col.update_one(filter, update)
167 .upsert(true)
168 .await
169 .map_err(mongo_err)?;
170 Ok(())
171 }
172
173 async fn get_error(&self, invocation_id: &InvocationId) -> RustvelloResult<Option<TaskError>> {
174 let db = self.pool.db().await?;
175 let col = db.collection::<mongodb::bson::Document>(ERROR_COL);
176 let filter = doc! { "_id": invocation_id.to_string() };
177 let result = col.find_one(filter).await.map_err(mongo_err)?;
178 match result {
179 Some(d) => match d.get_str("error") {
180 Ok(s) => {
181 let err: TaskError =
182 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
183 message: e.to_string(),
184 })?;
185 Ok(Some(err))
186 }
187 Err(_) => Ok(None),
188 },
189 None => Ok(None),
190 }
191 }
192
193 async fn add_history(&self, history: &InvocationHistory) -> RustvelloResult<()> {
194 let db = self.pool.db().await?;
195 let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
196 let json = serde_json::to_string(history).map_err(|e| RustvelloError::Serialization {
197 message: e.to_string(),
198 })?;
199 let runner_id = history
200 .runner_id
201 .as_ref()
202 .or(history.status_record.runner_id.as_ref())
203 .map(|r| r.as_str().to_string());
204 let ts = history
205 .history_timestamp
206 .unwrap_or(history.status_record.timestamp);
207 let doc = doc! {
208 "invocation_id": history.invocation_id.to_string(),
209 "runner_id": runner_id,
210 "timestamp": mongodb::bson::DateTime::from_millis(ts.timestamp_millis()),
211 "data": &json,
212 };
213 col.insert_one(doc).await.map_err(mongo_err)?;
214 Ok(())
215 }
216
217 async fn get_history(
218 &self,
219 invocation_id: &InvocationId,
220 ) -> RustvelloResult<Vec<InvocationHistory>> {
221 let db = self.pool.db().await?;
222 let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
223 let filter = doc! { "invocation_id": invocation_id.to_string() };
224 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
225
226 let mut result = Vec::new();
227 use futures_util::StreamExt;
228 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
229 let d = doc_result.map_err(mongo_err)?;
230 if let Ok(s) = d.get_str("data") {
231 let h: InvocationHistory =
232 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
233 message: e.to_string(),
234 })?;
235 result.push(h);
236 }
237 }
238 Ok(result)
239 }
240
241 async fn purge(&self) -> RustvelloResult<()> {
242 let db = self.pool.db().await?;
243 for col_name in [
244 INV_COL,
245 CALL_COL,
246 RESULT_COL,
247 ERROR_COL,
248 HISTORY_COL,
249 WF_RUNS_COL,
250 WF_DATA_COL,
251 APP_INFOS_COL,
252 WF_SUB_COL,
253 RUNNER_CTX_COL,
254 ] {
255 let col = db.collection::<mongodb::bson::Document>(col_name);
256 col.delete_many(doc! {}).await.map_err(mongo_err)?;
257 }
258 Ok(())
259 }
260}
261
262#[async_trait]
263impl StateBackendQuery for MongoStateBackend {
264 async fn get_workflow_invocations(
265 &self,
266 workflow_id: &InvocationId,
267 ) -> RustvelloResult<Vec<InvocationId>> {
268 let db = self.pool.db().await?;
269 let col = db.collection::<mongodb::bson::Document>(INV_COL);
270 let filter = doc! { "workflow_id": workflow_id.to_string() };
271 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
272
273 let mut result = Vec::new();
274 use futures_util::StreamExt;
275 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
276 let d = doc_result.map_err(mongo_err)?;
277 if let Ok(id) = d.get_str("_id") {
278 result.push(InvocationId::from_string(id.to_string()));
279 }
280 }
281 Ok(result)
282 }
283
284 async fn get_child_invocations(
285 &self,
286 parent_invocation_id: &InvocationId,
287 ) -> RustvelloResult<Vec<InvocationId>> {
288 let db = self.pool.db().await?;
289 let col = db.collection::<mongodb::bson::Document>(INV_COL);
290 let filter = doc! { "parent_invocation_id": parent_invocation_id.to_string() };
291 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
292
293 let mut result = Vec::new();
294 use futures_util::StreamExt;
295 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
296 let d = doc_result.map_err(mongo_err)?;
297 if let Ok(id) = d.get_str("_id") {
298 result.push(InvocationId::from_string(id.to_string()));
299 }
300 }
301 Ok(result)
302 }
303
304 async fn store_workflow_run(&self, workflow: &WorkflowIdentity) -> RustvelloResult<()> {
305 let db = self.pool.db().await?;
306 let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
307 let filter = doc! { "_id": workflow.workflow_id.as_str() };
308 let update = doc! { "$set": {
309 "workflow_type": workflow.workflow_type.to_string(),
310 "parent_workflow_id": workflow.parent_id.as_ref().map(|id| id.as_str().to_string()),
311 "depth": workflow.depth as i32,
312 }};
313 col.update_one(filter, update)
314 .upsert(true)
315 .await
316 .map_err(mongo_err)?;
317 Ok(())
318 }
319
320 async fn get_all_workflow_types(&self) -> RustvelloResult<Vec<TaskId>> {
321 let db = self.pool.db().await?;
322 let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
323 let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
324 let mut types = std::collections::HashSet::new();
325 use futures_util::StreamExt;
326 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
327 let d = doc_result.map_err(mongo_err)?;
328 if let Ok(t) = d.get_str("workflow_type") {
329 types.insert(t.to_string());
330 }
331 }
332 types
333 .into_iter()
334 .map(|s| {
335 s.parse::<TaskId>().map_err(|e| {
336 RustvelloError::state_backend(format!("invalid task_id in database: {e}"))
337 })
338 })
339 .collect()
340 }
341
342 async fn get_workflow_runs(
343 &self,
344 workflow_type: &TaskId,
345 ) -> RustvelloResult<Vec<WorkflowIdentity>> {
346 let db = self.pool.db().await?;
347 let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
348 let filter = doc! { "workflow_type": workflow_type.to_string() };
349 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
350 let mut result = Vec::new();
351 use futures_util::StreamExt;
352 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
353 let d = doc_result.map_err(mongo_err)?;
354 let wf_id = d
355 .get_str("_id")
356 .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
357 let wf_type = d
358 .get_str("workflow_type")
359 .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
360 let parent_id = d
361 .get_str("parent_workflow_id")
362 .ok()
363 .map(std::string::ToString::to_string);
364 let depth = d.get_i32("depth").unwrap_or(0);
365 let task_id = wf_type.parse::<TaskId>().map_err(|e| {
366 RustvelloError::state_backend(format!("invalid workflow task_id in database: {e}"))
367 })?;
368 result.push(WorkflowIdentity {
369 workflow_id: InvocationId::from_string(wf_id.to_string()),
370 workflow_type: task_id,
371 parent_id: parent_id.map(InvocationId::from_string),
372 depth: u32::try_from(depth).unwrap_or(0),
373 });
374 }
375 Ok(result)
376 }
377
378 async fn set_workflow_data(
379 &self,
380 workflow_id: &InvocationId,
381 key: &str,
382 value: &str,
383 ) -> RustvelloResult<()> {
384 let db = self.pool.db().await?;
385 let col = db.collection::<mongodb::bson::Document>(WF_DATA_COL);
386 let doc_id = format!("{}:{}", workflow_id.as_str(), key);
387 let filter = doc! { "_id": &doc_id };
388 let update =
389 doc! { "$set": { "workflow_id": workflow_id.as_str(), "key": key, "value": value } };
390 col.update_one(filter, update)
391 .upsert(true)
392 .await
393 .map_err(mongo_err)?;
394 Ok(())
395 }
396
397 async fn get_workflow_data(
398 &self,
399 workflow_id: &InvocationId,
400 key: &str,
401 ) -> RustvelloResult<Option<String>> {
402 let db = self.pool.db().await?;
403 let col = db.collection::<mongodb::bson::Document>(WF_DATA_COL);
404 let doc_id = format!("{}:{}", workflow_id.as_str(), key);
405 let filter = doc! { "_id": &doc_id };
406 let result = col.find_one(filter).await.map_err(mongo_err)?;
407 Ok(result.and_then(|d| d.get_str("value").ok().map(ToString::to_string)))
408 }
409
410 async fn store_app_info(&self, app_id: &str, info_json: &str) -> RustvelloResult<()> {
411 let db = self.pool.db().await?;
412 let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
413 let filter = doc! { "_id": app_id };
414 let update = doc! { "$set": { "info_json": info_json } };
415 col.update_one(filter, update)
416 .upsert(true)
417 .await
418 .map_err(mongo_err)?;
419 Ok(())
420 }
421
422 async fn get_app_info(&self, app_id: &str) -> RustvelloResult<Option<String>> {
423 let db = self.pool.db().await?;
424 let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
425 let filter = doc! { "_id": app_id };
426 let result = col.find_one(filter).await.map_err(mongo_err)?;
427 Ok(result.and_then(|d| d.get_str("info_json").ok().map(ToString::to_string)))
428 }
429
430 async fn get_all_app_infos(&self) -> RustvelloResult<Vec<(String, String)>> {
431 let db = self.pool.db().await?;
432 let col = db.collection::<mongodb::bson::Document>(APP_INFOS_COL);
433 let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
434 let mut result = Vec::new();
435 use futures_util::StreamExt;
436 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
437 let d = doc_result.map_err(mongo_err)?;
438 if let (Ok(app_id), Ok(info)) = (d.get_str("_id"), d.get_str("info_json")) {
439 result.push((app_id.to_string(), info.to_string()));
440 }
441 }
442 Ok(result)
443 }
444
445 async fn store_workflow_sub_invocation(
446 &self,
447 workflow_id: &InvocationId,
448 sub_inv_id: &InvocationId,
449 ) -> RustvelloResult<()> {
450 let db = self.pool.db().await?;
451 let col = db.collection::<mongodb::bson::Document>(WF_SUB_COL);
452 let doc_id = format!("{}:{}", workflow_id.as_str(), sub_inv_id.as_str());
453 let filter = doc! { "_id": &doc_id };
454 let update = doc! { "$set": {
455 "workflow_id": workflow_id.as_str(),
456 "sub_invocation_id": sub_inv_id.as_str(),
457 }};
458 col.update_one(filter, update)
459 .upsert(true)
460 .await
461 .map_err(mongo_err)?;
462 Ok(())
463 }
464
465 async fn get_workflow_sub_invocations(
466 &self,
467 workflow_id: &InvocationId,
468 ) -> RustvelloResult<Vec<InvocationId>> {
469 let db = self.pool.db().await?;
470 let col = db.collection::<mongodb::bson::Document>(WF_SUB_COL);
471 let filter = doc! { "workflow_id": workflow_id.as_str() };
472 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
473 let mut result = Vec::new();
474 use futures_util::StreamExt;
475 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
476 let d = doc_result.map_err(mongo_err)?;
477 if let Ok(id) = d.get_str("sub_invocation_id") {
478 result.push(InvocationId::from_string(id.to_string()));
479 }
480 }
481 Ok(result)
482 }
483
484 async fn get_all_workflow_runs(&self) -> RustvelloResult<Vec<WorkflowIdentity>> {
485 let db = self.pool.db().await?;
486 let col = db.collection::<mongodb::bson::Document>(WF_RUNS_COL);
487 let mut cursor = col.find(doc! {}).await.map_err(mongo_err)?;
488 let mut result = Vec::new();
489 use futures_util::StreamExt;
490 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
491 let d = doc_result.map_err(mongo_err)?;
492 let wf_id = d.get_str("_id").unwrap_or_default().to_string();
493 let wf_type_str = d.get_str("workflow_type").unwrap_or_default();
494 let task_id = wf_type_str.parse::<TaskId>().map_err(|e| {
495 RustvelloError::state_backend(format!("invalid workflow task_id in database: {e}"))
496 })?;
497 let parent_id = d
498 .get_str("parent_workflow_id")
499 .ok()
500 .filter(|s| !s.is_empty())
501 .map(|s| InvocationId::from_string(s.to_string()));
502 let depth = u32::try_from(d.get_i32("depth").unwrap_or(0)).unwrap_or(0);
503 result.push(WorkflowIdentity {
504 workflow_id: InvocationId::from_string(wf_id),
505 workflow_type: task_id,
506 parent_id,
507 depth,
508 });
509 }
510 Ok(result)
511 }
512}
513
514#[async_trait]
515impl StateBackendRunner for MongoStateBackend {
516 async fn store_runner_context(&self, context: &StoredRunnerContext) -> RustvelloResult<()> {
517 let db = self.pool.db().await?;
518 let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
519 let json = serde_json::to_string(context).map_err(|e| RustvelloError::Serialization {
520 message: e.to_string(),
521 })?;
522 let filter = doc! { "_id": &context.runner_id };
523 let update = doc! { "$set": {
524 "data": &json,
525 "parent_runner_id": &context.parent_runner_id,
526 }};
527 col.update_one(filter, update)
528 .upsert(true)
529 .await
530 .map_err(mongo_err)?;
531 Ok(())
532 }
533
534 async fn get_runner_context(
535 &self,
536 runner_id: &str,
537 ) -> RustvelloResult<Option<StoredRunnerContext>> {
538 let db = self.pool.db().await?;
539 let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
540 let filter = doc! { "_id": runner_id };
541 let result = col.find_one(filter).await.map_err(mongo_err)?;
542 match result {
543 Some(d) => {
544 let s = d
545 .get_str("data")
546 .map_err(|e| RustvelloError::state_backend(e.to_string()))?;
547 let ctx: StoredRunnerContext =
548 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
549 message: e.to_string(),
550 })?;
551 Ok(Some(ctx))
552 }
553 None => Ok(None),
554 }
555 }
556
557 async fn get_runner_contexts_by_parent(
558 &self,
559 parent_runner_id: &str,
560 ) -> RustvelloResult<Vec<StoredRunnerContext>> {
561 let db = self.pool.db().await?;
562 let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
563 let filter = doc! { "parent_runner_id": parent_runner_id };
564 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
565 let mut result = Vec::new();
566 use futures_util::StreamExt;
567 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
568 let d = doc_result.map_err(mongo_err)?;
569 if let Ok(s) = d.get_str("data") {
570 let ctx: StoredRunnerContext =
571 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
572 message: e.to_string(),
573 })?;
574 result.push(ctx);
575 }
576 }
577 Ok(result)
578 }
579
580 async fn get_invocation_ids_by_runner(
581 &self,
582 runner_id: &str,
583 limit: usize,
584 offset: usize,
585 ) -> RustvelloResult<Vec<InvocationId>> {
586 let db = self.pool.db().await?;
587 let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
588 let filter = doc! { "runner_id": runner_id };
589 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
590 let mut seen = std::collections::HashSet::new();
591 let mut result = Vec::new();
592 use futures_util::StreamExt;
593 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
594 let d = doc_result.map_err(mongo_err)?;
595 if let Ok(inv_id) = d.get_str("invocation_id") {
596 if seen.insert(inv_id.to_string()) {
597 result.push(InvocationId::from_string(inv_id.to_string()));
598 }
599 }
600 }
601 let iter = result.into_iter().skip(offset);
602 let ids: Vec<InvocationId> = if limit > 0 {
603 iter.take(limit).collect()
604 } else {
605 iter.collect()
606 };
607 Ok(ids)
608 }
609
610 async fn count_invocations_by_runner(&self, runner_id: &str) -> RustvelloResult<usize> {
611 let db = self.pool.db().await?;
612 let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
613 let filter = doc! { "runner_id": runner_id };
614 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
615 let mut seen = std::collections::HashSet::new();
616 use futures_util::StreamExt;
617 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
618 let d = doc_result.map_err(mongo_err)?;
619 if let Ok(inv_id) = d.get_str("invocation_id") {
620 seen.insert(inv_id.to_string());
621 }
622 }
623 Ok(seen.len())
624 }
625
626 async fn get_history_in_timerange(
627 &self,
628 start: chrono::DateTime<chrono::Utc>,
629 end: chrono::DateTime<chrono::Utc>,
630 limit: usize,
631 offset: usize,
632 ) -> RustvelloResult<Vec<InvocationHistory>> {
633 let db = self.pool.db().await?;
634 let col = db.collection::<mongodb::bson::Document>(HISTORY_COL);
635 let start_bson = mongodb::bson::DateTime::from_millis(start.timestamp_millis());
636 let end_bson = mongodb::bson::DateTime::from_millis(end.timestamp_millis());
637 let filter = doc! {
638 "timestamp": { "$gte": start_bson, "$lte": end_bson }
639 };
640 let opts = mongodb::options::FindOptions::builder()
641 .sort(doc! { "timestamp": 1 })
642 .skip(Some(offset as u64))
643 .limit(if limit > 0 { Some(limit as i64) } else { None })
644 .build();
645 let mut cursor = col
646 .find(filter)
647 .with_options(opts)
648 .await
649 .map_err(mongo_err)?;
650 let mut result = Vec::new();
651 use futures_util::StreamExt;
652 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
653 let d = doc_result.map_err(mongo_err)?;
654 if let Ok(s) = d.get_str("data") {
655 let h: InvocationHistory =
656 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
657 message: e.to_string(),
658 })?;
659 result.push(h);
660 }
661 }
662 Ok(result)
663 }
664
665 async fn get_matching_runner_contexts(
666 &self,
667 partial_id: &str,
668 ) -> RustvelloResult<Vec<StoredRunnerContext>> {
669 let db = self.pool.db().await?;
670 let col = db.collection::<mongodb::bson::Document>(RUNNER_CTX_COL);
671 let filter = doc! { "_id": { "$regex": partial_id } };
672 let mut cursor = col.find(filter).await.map_err(mongo_err)?;
673 let mut result = Vec::new();
674 use futures_util::StreamExt;
675 while let Some(doc_result) = StreamExt::next(&mut cursor).await {
676 let d = doc_result.map_err(mongo_err)?;
677 if let Ok(s) = d.get_str("data") {
678 let ctx: StoredRunnerContext =
679 serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
680 message: e.to_string(),
681 })?;
682 result.push(ctx);
683 }
684 }
685 Ok(result)
686 }
687}