weavegraph 0.3.0

Graph-driven, concurrent agent workflow framework with versioned state, deterministic barrier merges, and rich diagnostics.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
//! Checkpointer infrastructure with thread‑safe persistence.
//!
//! This module provides:
//! - Thread‑safe async operations using `tokio::sync::RwLock`
//! - Tracing integration (`#[instrument]`) for observability
//! - Multiple backends: in‑memory (volatile) and SQLite (durable)
//!
//! # Thread Safety
//! All implementations use async‑aware synchronization primitives and can be
//! called across `await` points without blocking executor threads.
//!
//! # Observability
//! Each operation on `InMemoryCheckpointer` and `SQLiteCheckpointer` is
//! instrumented with structured tracing fields (e.g. `session_id`, `step`).
//! Enable debug logging to view activity:
//! ```bash
//! RUST_LOG=weavegraph::runtimes::checkpointer=debug cargo run
//! ```
//!
//! # Storage Management
//! - **InMemoryCheckpointer**: Stores only the latest checkpoint per session
//!   (implicit retention; no history).
//! - **SQLiteCheckpointer**: Stores full step history for durable audit and
//!   replay; use external SQL maintenance for long‑running deployments.
//!
//! See type‑level docs for cleanup guidance.

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use rustc_hash::FxHashMap;
use std::sync::RwLock;

use crate::{
    runtimes::session::SessionState, schedulers::SchedulerState, state::VersionedState,
    types::NodeKind,
};

/// A durable snapshot of session execution state at a barrier boundary.
///
/// This structure captures both the current state and execution history
/// to enable full session resumption and audit trails.
#[derive(Debug, Clone)]
pub struct Checkpoint {
    pub session_id: String,
    pub step: u64,
    pub state: VersionedState,
    pub frontier: Vec<NodeKind>,
    pub versions_seen: FxHashMap<String, FxHashMap<String, u64>>, // scheduler gating
    pub concurrency_limit: usize,
    pub created_at: DateTime<Utc>,
    /// Nodes that executed in this step (empty for step 0)
    pub ran_nodes: Vec<NodeKind>,
    /// Nodes that were skipped in this step (empty for step 0)
    pub skipped_nodes: Vec<NodeKind>,
    /// Channels that were updated in this step (empty for step 0)
    pub updated_channels: Vec<String>,
}

impl Checkpoint {
    /// Create a checkpoint from the current session state.
    ///
    /// This captures a snapshot of the session's execution state that can be
    /// persisted and later restored to resume execution from this point.
    ///
    /// # Parameters
    ///
    /// * `session_id` - Unique identifier for the session
    /// * `session` - Current session state to checkpoint
    ///
    /// # Returns
    ///
    /// A `Checkpoint` containing all necessary state for resumption
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// # use weavegraph::runtimes::{Checkpoint, SessionState};
    /// # fn example(session_state: SessionState) {
    /// let checkpoint = Checkpoint::from_session("my_session", &session_state);
    /// // checkpoint can now be saved via a Checkpointer
    /// # }
    /// ```
    #[must_use]
    pub fn from_session(session_id: &str, session: &SessionState) -> Self {
        Self {
            session_id: session_id.to_string(),
            step: session.step,
            state: session.state.clone(),
            frontier: session.frontier.clone(),
            versions_seen: session.scheduler_state.versions_seen.clone(),
            concurrency_limit: session.scheduler.concurrency_limit,
            created_at: Utc::now(),
            ran_nodes: vec![], // No execution history for raw session state
            skipped_nodes: vec![],
            updated_channels: vec![],
        }
    }

    /// Create a checkpoint from a completed step report.
    ///
    /// This captures the full execution context including what nodes ran,
    /// were skipped, and which channels were updated during the step.
    ///
    /// # Parameters
    ///
    /// * `session_id` - Unique identifier for the session
    /// * `session_state` - Current session state after step execution
    /// * `step_report` - Details of what happened during step execution
    ///
    /// # Returns
    ///
    /// A `Checkpoint` with complete step execution metadata
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// # use weavegraph::runtimes::{Checkpoint, SessionState, StepReport};
    /// # fn example(session_state: SessionState, step_report: StepReport) {
    /// let checkpoint = Checkpoint::from_step_report(
    ///     "my_session",
    ///     &session_state,
    ///     &step_report
    /// );
    /// # }
    /// ```
    #[must_use]
    pub fn from_step_report(
        session_id: &str,
        session_state: &SessionState,
        step_report: &crate::runtimes::execution::StepReport,
    ) -> Self {
        Self {
            session_id: session_id.to_string(),
            step: session_state.step,
            state: session_state.state.clone(),
            frontier: session_state.frontier.clone(),
            versions_seen: session_state.scheduler_state.versions_seen.clone(),
            concurrency_limit: session_state.scheduler.concurrency_limit,
            created_at: Utc::now(),
            ran_nodes: step_report.ran_nodes.clone(),
            skipped_nodes: step_report.skipped_nodes.clone(),
            updated_channels: step_report
                .barrier_outcome
                .updated_channels
                .iter()
                .map(|s| (*s).to_string())
                .collect(),
        }
    }
}

/// Errors from checkpointer operations.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "diagnostics", derive(miette::Diagnostic))]
pub enum CheckpointerError {
    /// Session was not found in the checkpointer.
    #[error("session not found: {session_id}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(
            code(weavegraph::checkpointer::not_found),
            help(
                "Ensure the session ID `{session_id}` is correct and the session has been created."
            )
        )
    )]
    NotFound { session_id: String },

    /// Backend storage error (database, filesystem, etc.).
    #[error("backend error: {message}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(
            code(weavegraph::checkpointer::backend),
            help("Check backend connectivity and permissions; backend message: {message}.")
        )
    )]
    Backend { message: String },

    /// Other checkpointer errors.
    #[error("checkpointer error: {message}")]
    #[cfg_attr(
        feature = "diagnostics",
        diagnostic(code(weavegraph::checkpointer::other))
    )]
    Other { message: String },
}

/// Selects the backing implementation of the `Checkpointer` trait.
///
/// Variants:
/// * `InMemory` – Volatile process‑local storage. Fast, non‑durable; suitable for
///   tests and ephemeral runs.
/// * `SQLite` – Durable, file (or memory) backed storage using `SQLiteCheckpointer`
///   (see `runtimes::checkpointer_sqlite`). Persists step history and the latest
///   snapshot for session resumption.
///
/// Note:
/// The runtime previously had an unreachable wildcard match when exhaustively
/// enumerating these variants. If additional variants are added in the future,
/// they should be explicitly matched (or a deliberate catch‑all retained).
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CheckpointerType {
    /// In‑memory (non‑durable) checkpointing.
    InMemory,
    #[cfg(feature = "sqlite")]
    #[cfg_attr(docsrs, doc(cfg(feature = "sqlite")))]
    /// SQLite‑backed durable checkpointing (see `SQLiteCheckpointer`).
    SQLite,
    #[cfg(feature = "postgres")]
    #[cfg_attr(docsrs, doc(cfg(feature = "postgres")))]
    /// PostgreSQL‑backed durable checkpointing (see `PostgresCheckpointer`).
    Postgres,
}

pub type Result<T> = std::result::Result<T, CheckpointerError>;

/// Trait for persistent storage and retrieval of workflow execution state.
///
/// Checkpointers provide durable storage for workflow execution state, enabling
/// session resumption across process restarts. Implementations must ensure that
/// checkpoints are atomic and consistent.
///
/// # Design Principles
///
/// - **Atomicity**: Checkpoint saves should be all-or-nothing operations
/// - **Consistency**: The stored state should always be in a valid, resumable state
/// - **Idempotency**: Saving the same checkpoint multiple times should be safe
/// - **Isolation**: Concurrent access to different sessions should not interfere
///
/// # Implementation Notes
///
/// - All operations should be idempotent where possible
/// - Concurrent access to the same session should be handled gracefully
/// - Backend errors should be mapped to appropriate `CheckpointerError` variants
/// - The `save` operation replaces any existing checkpoint for the session
/// - The `load_latest` operation returns `None` for non-existent sessions
///
/// # Thread Safety
///
/// All implementations must be `Send + Sync` to allow usage across async tasks
/// and thread boundaries. Interior mutability should use appropriate synchronization
/// primitives (e.g., `RwLock`, `Mutex`).
///
/// # Error Handling
///
/// Methods should return specific `CheckpointerError` variants:
/// - `NotFound`: When a session doesn't exist (only for operations that require it)
/// - `Backend`: For storage-related errors (database, filesystem, network)
/// - `Other`: For serialization errors or other unexpected conditions
///
/// # Examples
///
/// ```rust,no_run
/// use weavegraph::runtimes::{Checkpointer, Checkpoint, InMemoryCheckpointer};
/// use weavegraph::state::VersionedState;
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let checkpointer = InMemoryCheckpointer::new();
///
/// // Save a checkpoint
/// let state = VersionedState::new_with_user_message("Hello");
/// // ... create checkpoint from session state
/// # let checkpoint = todo!(); // placeholder
/// checkpointer.save(checkpoint).await?;
///
/// // Load the latest checkpoint
/// if let Some(checkpoint) = checkpointer.load_latest("session_id").await? {
///     // Resume execution from checkpoint
///     println!("Resuming from step {}", checkpoint.step);
/// }
///
/// // List all sessions
/// let sessions = checkpointer.list_sessions().await?;
/// println!("Found {} sessions", sessions.len());
/// # Ok(())
/// # }
/// ```
#[async_trait]
pub trait Checkpointer: Send + Sync {
    /// Persist the latest checkpoint for a session.
    ///
    /// This operation should be atomic and idempotent. If a checkpoint already
    /// exists for the session, it will be replaced. The implementation should
    /// ensure that concurrent saves to the same session are handled safely.
    ///
    /// # Parameters
    ///
    /// * `checkpoint` - The checkpoint data to persist
    ///
    /// # Returns
    ///
    /// * `Ok(())` - Checkpoint was successfully saved
    /// * `Err(CheckpointerError)` - Save operation failed
    ///
    /// # Errors
    ///
    /// * `Backend` - Storage backend error (database, filesystem, etc.)
    /// * `Other` - Serialization error or other unexpected condition
    async fn save(&self, checkpoint: Checkpoint) -> Result<()>;

    /// Load the most recent checkpoint for a session.
    ///
    /// Returns `None` if no checkpoint exists for the given session ID.
    /// This operation should be consistent with the latest `save` operation.
    ///
    /// # Parameters
    ///
    /// * `session_id` - Unique identifier for the session
    ///
    /// # Returns
    ///
    /// * `Ok(Some(checkpoint))` - Latest checkpoint was found and loaded
    /// * `Ok(None)` - No checkpoint exists for this session
    /// * `Err(CheckpointerError)` - Load operation failed
    ///
    /// # Errors
    ///
    /// * `Backend` - Storage backend error
    /// * `Other` - Deserialization error or corruption
    async fn load_latest(&self, session_id: &str) -> Result<Option<Checkpoint>>;

    /// List all session IDs known to this checkpointer.
    ///
    /// Returns a vector of session IDs that have at least one checkpoint
    /// stored. The order is implementation-defined but should be consistent.
    ///
    /// # Returns
    ///
    /// * `Ok(session_ids)` - List of all known session IDs
    /// * `Err(CheckpointerError)` - List operation failed
    ///
    /// # Errors
    ///
    /// * `Backend` - Storage backend error
    async fn list_sessions(&self) -> Result<Vec<String>>;
}

/// Simple in‑memory checkpointer with implicit retention.
///
/// Characteristics:
/// - Volatile: process‑local only
/// - Retention: last checkpoint per session (no historical steps)
/// - Concurrency: `std::sync::RwLock` for fast synchronous access (no async overhead)
/// - Observability: `#[instrument]` on public trait methods
///
/// Enable debug tracing to inspect operations:
/// ```bash
/// RUST_LOG=weavegraph::runtimes::checkpointer=debug
/// ```
#[derive(Default)]
pub struct InMemoryCheckpointer {
    inner: RwLock<FxHashMap<String, Checkpoint>>,
}

impl InMemoryCheckpointer {
    /// Create a new in-memory checkpointer.
    ///
    /// # Returns
    ///
    /// A new `InMemoryCheckpointer` instance
    #[must_use]
    pub fn new() -> Self {
        Self {
            inner: RwLock::new(FxHashMap::default()),
        }
    }
}

#[async_trait]
impl Checkpointer for InMemoryCheckpointer {
    #[tracing::instrument(skip(self), fields(session_id = %checkpoint.session_id, step = checkpoint.step))]
    async fn save(&self, checkpoint: Checkpoint) -> Result<()> {
        let mut map = self
            .inner
            .write()
            .expect("InMemoryCheckpointer RwLock poisoned");
        map.insert(checkpoint.session_id.clone(), checkpoint);
        Ok(())
    }

    #[tracing::instrument(skip(self), fields(session_id = %session_id))]
    async fn load_latest(&self, session_id: &str) -> Result<Option<Checkpoint>> {
        let map = self
            .inner
            .read()
            .expect("InMemoryCheckpointer RwLock poisoned");
        Ok(map.get(session_id).cloned())
    }

    #[tracing::instrument(skip(self))]
    async fn list_sessions(&self) -> Result<Vec<String>> {
        let map = self
            .inner
            .read()
            .expect("InMemoryCheckpointer RwLock poisoned");
        Ok(map.keys().cloned().collect())
    }
}

/// Restore a `SessionState` from a persisted `Checkpoint`.
///
/// This utility function reconstructs the in-memory session state from a
/// checkpoint, allowing execution to resume from the checkpointed step.
/// The restored state maintains all version information and scheduler state
/// for seamless continuation.
///
/// # Parameters
///
/// * `cp` - The checkpoint to restore from
///
/// # Returns
///
/// A `SessionState` ready for continued execution with:
/// - Restored versioned state channels (messages, extra)
/// - Correct step counter and frontier nodes
/// - Reconstructed scheduler with original concurrency limits
/// - Preserved version tracking for proper barrier coordination
///
/// # Examples
///
/// ```rust,no_run
/// # use weavegraph::runtimes::{restore_session_state, Checkpoint};
/// # async fn example(checkpoint: Checkpoint) {
/// let session_state = restore_session_state(&checkpoint);
/// // session_state can now be used to continue execution
/// assert_eq!(session_state.step, checkpoint.step);
/// assert_eq!(session_state.frontier, checkpoint.frontier);
/// # }
/// ```
#[must_use = "restored session state should be used to continue execution"]
pub fn restore_session_state(cp: &Checkpoint) -> SessionState {
    use crate::schedulers::Scheduler;
    SessionState {
        state: cp.state.clone(),
        step: cp.step,
        frontier: cp.frontier.clone(),
        scheduler: Scheduler::new(cp.concurrency_limit),
        scheduler_state: SchedulerState {
            versions_seen: cp.versions_seen.clone(),
        },
    }
}