Skip to main content

motosan_agent_workflow/
execution.rs

1//! Execution recording data model for workflow history tracking.
2//!
3//! Provides [`ExecutionRecord`] and [`NodeRecord`] for capturing complete
4//! workflow execution history, including per-node timing, token usage,
5//! tool calls, and aggregated statistics.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use async_trait::async_trait;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use tokio::fs;
16use tokio::sync::RwLock;
17
18use crate::error::Result;
19use crate::event::WorkflowEvent;
20
21// ---------------------------------------------------------------------------
22// ExecutionRecord
23// ---------------------------------------------------------------------------
24
25/// Complete record of a single workflow execution.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ExecutionRecord {
28    /// Unique execution identifier.
29    pub id: String,
30    /// Identifier of the workflow that was executed.
31    pub workflow_id: String,
32    /// Human-readable workflow name.
33    pub workflow_name: String,
34    /// Current execution status.
35    pub status: ExecutionStatus,
36    /// Unix timestamp in milliseconds when execution started.
37    pub started_at: u64,
38    /// Unix timestamp in milliseconds when execution finished (if completed).
39    pub finished_at: Option<u64>,
40    /// Wall-clock duration in milliseconds.
41    pub duration_ms: u64,
42    /// Total LLM tokens consumed across all nodes.
43    pub total_tokens: u64,
44    /// Total input (prompt) tokens consumed across all nodes.
45    pub total_input_tokens: u64,
46    /// Total output (completion) tokens produced across all nodes.
47    pub total_output_tokens: u64,
48    /// Estimated USD cost of the execution.
49    pub estimated_cost_usd: f64,
50    /// Input data supplied to the workflow.
51    pub input: Value,
52    /// Per-node execution records in topological order.
53    pub node_records: Vec<NodeRecord>,
54    /// Top-level error message if the execution failed.
55    pub error: Option<String>,
56    /// How the execution was triggered.
57    pub trigger: TriggerType,
58    /// Arbitrary key-value metadata.
59    pub metadata: HashMap<String, Value>,
60}
61
62/// Overall status of a workflow execution.
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum ExecutionStatus {
66    Running,
67    Success,
68    Failed,
69    Paused,
70    Cancelled,
71}
72
73/// How a workflow execution was triggered.
74#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum TriggerType {
77    Manual,
78    Scheduled,
79    Webhook,
80    Api,
81}
82
83// ---------------------------------------------------------------------------
84// NodeRecord
85// ---------------------------------------------------------------------------
86
87/// Record of a single node's execution within a workflow run.
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct NodeRecord {
90    /// Node identifier within the workflow DAG.
91    pub node_id: String,
92    /// Human-readable node name.
93    pub node_name: String,
94    /// Node kind label (e.g. "agent", "human", "transform", "script").
95    pub node_kind: String,
96    /// Current node status.
97    pub status: NodeStatus,
98    /// Unix timestamp in milliseconds when the node started.
99    pub started_at: u64,
100    /// Unix timestamp in milliseconds when the node finished (if completed).
101    pub finished_at: Option<u64>,
102    /// Wall-clock duration in milliseconds.
103    pub duration_ms: u64,
104    /// Input data supplied to this node.
105    pub input: Value,
106    /// Output data produced by this node.
107    pub output: Option<Value>,
108    /// LLM tokens consumed by this node (backward-compatible total).
109    pub tokens_used: u64,
110    /// Input (prompt) tokens consumed by this node.
111    pub input_tokens: u64,
112    /// Output (completion) tokens produced by this node.
113    pub output_tokens: u64,
114    /// Total tokens (input + output) consumed by this node.
115    pub total_tokens: u64,
116    /// Estimated USD cost of this node's LLM usage.
117    pub estimated_cost_usd: f64,
118    /// Individual LLM call records for this node.
119    pub llm_calls: Vec<LlmCallRecord>,
120    /// Number of retry attempts before the final result.
121    pub retry_attempts: u32,
122    /// Error message if the node failed.
123    pub error: Option<String>,
124    /// Tool calls made during node execution.
125    pub tool_calls: Vec<ToolCallRecord>,
126}
127
128/// Status of an individual node execution.
129#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum NodeStatus {
132    Pending,
133    Running,
134    Success,
135    Failed,
136    Skipped,
137}
138
139// ---------------------------------------------------------------------------
140// LlmCallRecord
141// ---------------------------------------------------------------------------
142
143/// Record of a single LLM invocation within a node execution.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct LlmCallRecord {
146    /// Number of input (prompt) tokens consumed.
147    pub input_tokens: u64,
148    /// Number of output (completion) tokens produced.
149    pub output_tokens: u64,
150    /// Wall-clock duration of this LLM call in milliseconds.
151    pub duration_ms: u64,
152    /// Whether this call was a retry attempt.
153    pub is_retry: bool,
154    /// Reason for this call (e.g. "initial", "schema_correction", "retry_attempt_N").
155    pub reason: String,
156}
157
158// ---------------------------------------------------------------------------
159// ToolCallRecord
160// ---------------------------------------------------------------------------
161
162/// Record of a single tool invocation made by a node.
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct ToolCallRecord {
165    /// Tool name / identifier.
166    pub tool: String,
167    /// Arguments passed to the tool.
168    pub args: Value,
169    /// Result returned by the tool, if available.
170    pub result: Option<Value>,
171    /// Wall-clock duration of the tool call in milliseconds.
172    pub duration_ms: u64,
173}
174
175// ---------------------------------------------------------------------------
176// ExecutionStats / aggregation
177// ---------------------------------------------------------------------------
178
179/// Aggregated statistics across multiple workflow executions.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ExecutionStats {
182    /// Total number of executions.
183    pub total_executions: u64,
184    /// Number of successful executions.
185    pub success_count: u64,
186    /// Number of failed executions.
187    pub failure_count: u64,
188    /// Average execution duration in milliseconds.
189    pub avg_duration_ms: f64,
190    /// Total LLM tokens consumed.
191    pub total_tokens: u64,
192    /// Total estimated cost in USD.
193    pub total_cost_usd: f64,
194    /// Failure rate as a ratio (0.0 – 1.0).
195    pub error_rate: f64,
196    /// Most frequent errors, ordered by count descending.
197    pub top_errors: Vec<ErrorSummary>,
198    /// Per-workflow breakdown.
199    pub per_workflow: HashMap<String, WorkflowStats>,
200}
201
202/// Summary of a recurring error.
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ErrorSummary {
205    /// Error message text.
206    pub message: String,
207    /// Number of occurrences.
208    pub count: u64,
209    /// Unix timestamp in milliseconds of the most recent occurrence.
210    pub last_seen: u64,
211}
212
213/// Per-workflow aggregated statistics.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct WorkflowStats {
216    /// Workflow identifier.
217    pub workflow_id: String,
218    /// Total executions for this workflow.
219    pub execution_count: u64,
220    /// Success rate as a ratio (0.0 – 1.0).
221    pub success_rate: f64,
222    /// Average execution duration in milliseconds.
223    pub avg_duration_ms: f64,
224    /// Average tokens per execution.
225    pub avg_tokens: f64,
226    /// Average cost per execution in USD.
227    pub avg_cost_usd: f64,
228}
229
230// ---------------------------------------------------------------------------
231// ExecutionFilter
232// ---------------------------------------------------------------------------
233
234/// Filter criteria for querying execution records.
235#[derive(Debug, Clone, Default, Serialize, Deserialize)]
236pub struct ExecutionFilter {
237    /// Filter by workflow identifier.
238    pub workflow_id: Option<String>,
239    /// Filter by execution status.
240    pub status: Option<ExecutionStatus>,
241    /// Only include executions started at or after this unix timestamp (ms).
242    pub since: Option<u64>,
243    /// Only include executions started before this unix timestamp (ms).
244    pub until: Option<u64>,
245    /// Maximum number of records to return.
246    pub limit: Option<usize>,
247    /// Number of records to skip (for pagination).
248    pub offset: Option<usize>,
249}
250
251// ---------------------------------------------------------------------------
252// ExecutionStore trait
253// ---------------------------------------------------------------------------
254
255/// Pluggable storage backend for execution records.
256#[async_trait]
257pub trait ExecutionStore: Send + Sync {
258    /// Persist an execution record.
259    async fn save(&self, record: &ExecutionRecord) -> Result<()>;
260    /// Retrieve an execution record by ID, returning `None` if not found.
261    async fn get(&self, id: &str) -> Result<Option<ExecutionRecord>>;
262    /// List execution records matching the given filter.
263    async fn list(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>>;
264    /// Compute aggregated statistics for executions matching the filter.
265    async fn stats(&self, filter: &ExecutionFilter) -> Result<ExecutionStats>;
266    /// Delete execution records whose `finished_at` is older than the given
267    /// unix timestamp in milliseconds. Returns the number of records deleted.
268    async fn prune(&self, older_than_ms: u64) -> Result<u64>;
269}
270
271// ---------------------------------------------------------------------------
272// FileExecutionStore
273// ---------------------------------------------------------------------------
274
275/// File-system-backed [`ExecutionStore`] that stores one JSON file per execution.
276///
277/// Storage layout:
278/// ```text
279/// base_dir/
280/// └── executions/
281///     ├── {id}.json
282///     └── ...
283/// ```
284pub struct FileExecutionStore {
285    base_dir: PathBuf,
286}
287
288impl FileExecutionStore {
289    /// Create a new `FileExecutionStore` rooted at `dir`.
290    pub fn new(dir: impl Into<PathBuf>) -> Self {
291        Self {
292            base_dir: dir.into(),
293        }
294    }
295
296    /// Returns the path to the `executions/` subdirectory.
297    fn executions_dir(&self) -> PathBuf {
298        self.base_dir.join("executions")
299    }
300
301    /// Returns the path for a specific execution record file.
302    fn record_path(&self, id: &str) -> PathBuf {
303        self.executions_dir().join(format!("{id}.json"))
304    }
305
306    /// Read all execution records from disk, optionally applying a filter.
307    async fn read_all(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>> {
308        let dir = self.executions_dir();
309        if !dir.exists() {
310            return Ok(Vec::new());
311        }
312
313        let mut entries = fs::read_dir(&dir).await?;
314        let mut records = Vec::new();
315
316        while let Some(entry) = entries.next_entry().await? {
317            let path = entry.path();
318            if path.extension().and_then(|e| e.to_str()) != Some("json") {
319                continue;
320            }
321            let data = fs::read(&path).await?;
322            let record: ExecutionRecord = serde_json::from_slice(&data)
323                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
324
325            // Apply filters
326            if let Some(ref wf_id) = filter.workflow_id {
327                if record.workflow_id != *wf_id {
328                    continue;
329                }
330            }
331            if let Some(ref status) = filter.status {
332                if record.status != *status {
333                    continue;
334                }
335            }
336            if let Some(since) = filter.since {
337                if record.started_at < since {
338                    continue;
339                }
340            }
341            if let Some(until) = filter.until {
342                if record.started_at >= until {
343                    continue;
344                }
345            }
346
347            records.push(record);
348        }
349
350        // Sort by started_at descending
351        records.sort_by(|a, b| b.started_at.cmp(&a.started_at));
352
353        Ok(records)
354    }
355}
356
357#[async_trait]
358impl ExecutionStore for FileExecutionStore {
359    async fn save(&self, record: &ExecutionRecord) -> Result<()> {
360        let dir = self.executions_dir();
361        fs::create_dir_all(&dir).await?;
362        let path = self.record_path(&record.id);
363        let data = serde_json::to_vec_pretty(record)
364            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
365        fs::write(&path, data).await?;
366        Ok(())
367    }
368
369    async fn get(&self, id: &str) -> Result<Option<ExecutionRecord>> {
370        let path = self.record_path(id);
371        if !path.exists() {
372            return Ok(None);
373        }
374        let data = fs::read(&path).await?;
375        let record: ExecutionRecord = serde_json::from_slice(&data)
376            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
377        Ok(Some(record))
378    }
379
380    async fn list(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>> {
381        let mut records = self.read_all(filter).await?;
382
383        // Apply offset
384        if let Some(offset) = filter.offset {
385            if offset >= records.len() {
386                return Ok(Vec::new());
387            }
388            records = records.split_off(offset);
389        }
390
391        // Apply limit
392        if let Some(limit) = filter.limit {
393            records.truncate(limit);
394        }
395
396        Ok(records)
397    }
398
399    async fn stats(&self, filter: &ExecutionFilter) -> Result<ExecutionStats> {
400        let records = self.read_all(filter).await?;
401
402        let total_executions = records.len() as u64;
403        let mut success_count: u64 = 0;
404        let mut failure_count: u64 = 0;
405        let mut total_duration: u64 = 0;
406        let mut total_tokens: u64 = 0;
407        let mut total_cost_usd: f64 = 0.0;
408        let mut error_counts: HashMap<String, (u64, u64)> = HashMap::new(); // msg -> (count, last_seen)
409        let mut per_workflow: HashMap<String, Vec<&ExecutionRecord>> = HashMap::new();
410
411        for record in &records {
412            match record.status {
413                ExecutionStatus::Success => success_count += 1,
414                ExecutionStatus::Failed => failure_count += 1,
415                _ => {}
416            }
417            total_duration += record.duration_ms;
418            total_tokens += record.total_tokens;
419            total_cost_usd += record.estimated_cost_usd;
420
421            if let Some(ref err) = record.error {
422                let entry = error_counts.entry(err.clone()).or_insert((0, 0));
423                entry.0 += 1;
424                if record.started_at > entry.1 {
425                    entry.1 = record.started_at;
426                }
427            }
428
429            per_workflow
430                .entry(record.workflow_id.clone())
431                .or_default()
432                .push(record);
433        }
434
435        let avg_duration_ms = if total_executions > 0 {
436            total_duration as f64 / total_executions as f64
437        } else {
438            0.0
439        };
440
441        let error_rate = if total_executions > 0 {
442            failure_count as f64 / total_executions as f64
443        } else {
444            0.0
445        };
446
447        let mut top_errors: Vec<ErrorSummary> = error_counts
448            .into_iter()
449            .map(|(message, (count, last_seen))| ErrorSummary {
450                message,
451                count,
452                last_seen,
453            })
454            .collect();
455        top_errors.sort_by(|a, b| b.count.cmp(&a.count));
456
457        let per_workflow = per_workflow
458            .into_iter()
459            .map(|(wf_id, recs)| {
460                let count = recs.len() as u64;
461                let successes = recs
462                    .iter()
463                    .filter(|r| r.status == ExecutionStatus::Success)
464                    .count() as u64;
465                let dur: u64 = recs.iter().map(|r| r.duration_ms).sum();
466                let tok: u64 = recs.iter().map(|r| r.total_tokens).sum();
467                let cost: f64 = recs.iter().map(|r| r.estimated_cost_usd).sum();
468
469                let stats = WorkflowStats {
470                    workflow_id: wf_id.clone(),
471                    execution_count: count,
472                    success_rate: if count > 0 {
473                        successes as f64 / count as f64
474                    } else {
475                        0.0
476                    },
477                    avg_duration_ms: if count > 0 {
478                        dur as f64 / count as f64
479                    } else {
480                        0.0
481                    },
482                    avg_tokens: if count > 0 {
483                        tok as f64 / count as f64
484                    } else {
485                        0.0
486                    },
487                    avg_cost_usd: if count > 0 {
488                        cost as f64 / count as f64
489                    } else {
490                        0.0
491                    },
492                };
493                (wf_id, stats)
494            })
495            .collect();
496
497        Ok(ExecutionStats {
498            total_executions,
499            success_count,
500            failure_count,
501            avg_duration_ms,
502            total_tokens,
503            total_cost_usd,
504            error_rate,
505            top_errors,
506            per_workflow,
507        })
508    }
509
510    async fn prune(&self, older_than_ms: u64) -> Result<u64> {
511        let dir = self.executions_dir();
512        if !dir.exists() {
513            return Ok(0);
514        }
515
516        let mut entries = fs::read_dir(&dir).await?;
517        let mut deleted: u64 = 0;
518
519        while let Some(entry) = entries.next_entry().await? {
520            let path = entry.path();
521            if path.extension().and_then(|e| e.to_str()) != Some("json") {
522                continue;
523            }
524            let data = fs::read(&path).await?;
525            let record: ExecutionRecord = match serde_json::from_slice(&data) {
526                Ok(r) => r,
527                Err(_) => continue,
528            };
529
530            if let Some(finished_at) = record.finished_at {
531                if finished_at < older_than_ms {
532                    fs::remove_file(&path).await?;
533                    deleted += 1;
534                }
535            }
536        }
537
538        Ok(deleted)
539    }
540}
541
542// ---------------------------------------------------------------------------
543// ExecutionObserver
544// ---------------------------------------------------------------------------
545
546/// Returns the current unix timestamp in milliseconds.
547fn now_ms() -> u64 {
548    SystemTime::now()
549        .duration_since(UNIX_EPOCH)
550        .unwrap()
551        .as_millis() as u64
552}
553
554/// Observer that listens to [`WorkflowEvent`]s and automatically builds
555/// and persists an [`ExecutionRecord`].
556///
557/// Events arrive sequentially from a single channel, so the observer does
558/// not need to handle concurrent writes. However, `handle_event` takes
559/// `&self` so internal state is wrapped in a [`RwLock`].
560pub struct ExecutionObserver {
561    store: Arc<dyn ExecutionStore>,
562    current: RwLock<Option<ExecutionRecord>>,
563}
564
565impl ExecutionObserver {
566    /// Create a new observer backed by the given store.
567    pub fn new(store: Arc<dyn ExecutionStore>) -> Self {
568        Self {
569            store,
570            current: RwLock::new(None),
571        }
572    }
573
574    /// Process a single [`WorkflowEvent`], updating the in-flight record.
575    pub async fn handle_event(&self, event: &WorkflowEvent) {
576        match event {
577            WorkflowEvent::WorkflowStarted { workflow_id, .. } => {
578                let record = ExecutionRecord {
579                    id: uuid::Uuid::new_v4().to_string(),
580                    workflow_id: workflow_id.clone(),
581                    workflow_name: String::new(),
582                    status: ExecutionStatus::Running,
583                    started_at: now_ms(),
584                    finished_at: None,
585                    duration_ms: 0,
586                    total_tokens: 0,
587                    total_input_tokens: 0,
588                    total_output_tokens: 0,
589                    estimated_cost_usd: 0.0,
590                    input: Value::Null,
591                    node_records: Vec::new(),
592                    error: None,
593                    trigger: TriggerType::Api,
594                    metadata: HashMap::new(),
595                };
596                let mut current = self.current.write().await;
597                *current = Some(record);
598            }
599
600            WorkflowEvent::NodeStarted { node_id, node_name, node_kind, .. } => {
601                let mut current = self.current.write().await;
602                if let Some(ref mut record) = *current {
603                    let node_record = NodeRecord {
604                        node_id: node_id.clone(),
605                        node_name: node_name.clone(),
606                        node_kind: node_kind.clone(),
607                        status: NodeStatus::Running,
608                        started_at: now_ms(),
609                        finished_at: None,
610                        duration_ms: 0,
611                        input: Value::Null,
612                        output: None,
613                        tokens_used: 0,
614                        input_tokens: 0,
615                        output_tokens: 0,
616                        total_tokens: 0,
617                        estimated_cost_usd: 0.0,
618                        llm_calls: Vec::new(),
619                        retry_attempts: 0,
620                        error: None,
621                        tool_calls: Vec::new(),
622                    };
623                    record.node_records.push(node_record);
624                }
625            }
626
627            WorkflowEvent::NodeInputResolved { node_id, input, .. } => {
628                let mut current = self.current.write().await;
629                if let Some(ref mut record) = *current {
630                    if let Some(node) = find_node_record(&mut record.node_records, node_id) {
631                        node.input = input.clone();
632                    }
633                }
634            }
635
636            WorkflowEvent::NodeCompleted {
637                node_id,
638                duration_ms,
639                input_tokens,
640                output_tokens,
641                tokens_used,
642                output,
643                ..
644            } => {
645                let mut current = self.current.write().await;
646                if let Some(ref mut record) = *current {
647                    if let Some(node) = find_node_record(&mut record.node_records, node_id) {
648                        node.status = NodeStatus::Success;
649                        node.duration_ms = *duration_ms;
650                        node.tokens_used = *tokens_used;
651                        node.input_tokens = *input_tokens;
652                        node.output_tokens = *output_tokens;
653                        node.total_tokens = *tokens_used;
654                        node.finished_at = Some(now_ms());
655                        node.output = output.clone();
656                    }
657                    // Update execution-level aggregates.
658                    record.total_input_tokens += *input_tokens;
659                    record.total_output_tokens += *output_tokens;
660                }
661            }
662
663            WorkflowEvent::NodeFailed {
664                node_id, error, ..
665            } => {
666                let mut current = self.current.write().await;
667                if let Some(ref mut record) = *current {
668                    if let Some(node) = find_node_record(&mut record.node_records, node_id) {
669                        node.status = NodeStatus::Failed;
670                        node.error = Some(error.clone());
671                        node.finished_at = Some(now_ms());
672                    }
673                }
674            }
675
676            WorkflowEvent::NodeRetryAttempt { node_id, .. } => {
677                let mut current = self.current.write().await;
678                if let Some(ref mut record) = *current {
679                    if let Some(node) = find_node_record(&mut record.node_records, node_id) {
680                        node.retry_attempts += 1;
681                    }
682                }
683            }
684
685            WorkflowEvent::WorkflowCompleted {
686                duration_ms,
687                total_tokens,
688                ..
689            } => {
690                {
691                    let mut current = self.current.write().await;
692                    if let Some(ref mut record) = *current {
693                        record.status = ExecutionStatus::Success;
694                        record.finished_at = Some(now_ms());
695                        record.duration_ms = *duration_ms;
696                        record.total_tokens = *total_tokens;
697                    }
698                }
699                // Auto-flush on completion; ignore store errors here.
700                let _ = self.flush().await;
701            }
702
703            WorkflowEvent::WorkflowFailed {
704                error, duration_ms, ..
705            } => {
706                {
707                    let mut current = self.current.write().await;
708                    if let Some(ref mut record) = *current {
709                        record.status = ExecutionStatus::Failed;
710                        record.error = Some(error.clone());
711                        record.finished_at = Some(now_ms());
712                        record.duration_ms = *duration_ms;
713                    }
714                }
715                // Auto-flush on failure; ignore store errors here.
716                let _ = self.flush().await;
717            }
718
719            WorkflowEvent::NodeLlmCall {
720                node_id,
721                input_tokens,
722                output_tokens,
723                duration_ms,
724                is_retry,
725                reason,
726                ..
727            } => {
728                let mut current = self.current.write().await;
729                if let Some(ref mut record) = *current {
730                    if let Some(node) = find_node_record(&mut record.node_records, node_id) {
731                        node.llm_calls.push(LlmCallRecord {
732                            input_tokens: *input_tokens,
733                            output_tokens: *output_tokens,
734                            duration_ms: *duration_ms,
735                            is_retry: *is_retry,
736                            reason: reason.clone(),
737                        });
738                    }
739                }
740            }
741
742            // All other event variants are ignored.
743            _ => {}
744        }
745    }
746
747    /// Flush the current in-flight record to the store.
748    pub async fn flush(&self) -> Result<()> {
749        let current = self.current.read().await;
750        if let Some(ref record) = *current {
751            self.store.save(record).await?;
752        }
753        Ok(())
754    }
755}
756
757/// Find a mutable reference to a [`NodeRecord`] by `node_id`.
758fn find_node_record<'a>(records: &'a mut [NodeRecord], node_id: &str) -> Option<&'a mut NodeRecord> {
759    records.iter_mut().find(|r| r.node_id == node_id)
760}