Skip to main content

lash_sqlite_store/
effect_replay.rs

1//! SQLite-backed runtime effect replay host (tokio-rusqlite port of the the prior store
2//! store's `effect_replay`).
3//!
4//! The public surface is identical to the prior implementation — same type names
5//! (`SqliteEffectHost`, `SqliteRuntimeEffectController`, `SqliteEffectReplayOptions`),
6//! same async signatures — so consumers swap backends with a path rename only.
7//! The only mechanical change is the database layer: the prior store ran every op directly
8//! on `&Connection` with `.await`; here every database body is a *synchronous*
9//! rusqlite closure handed to the shared [`SqliteConnection`] wrapper. The
10//! claim/finalize paths run inside `conn.write` (`BEGIN IMMEDIATE`) so the
11//! cross-process write lock is taken up front — the lease fence WAL gives us.
12
13use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19    DurabilityTier, EffectHost, EffectScope, PluginError, ProcessCommand, ProcessEffectOutcome,
20    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21    RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22    ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(30);
31const BUSY_POLL: Duration = Duration::from_millis(25);
32
33static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
34
35/// Options for SQLite-backed runtime effect replay.
36#[derive(Clone, Debug)]
37pub struct SqliteEffectReplayOptions {
38    pub lease_ttl: Duration,
39}
40
41impl Default for SqliteEffectReplayOptions {
42    fn default() -> Self {
43        Self {
44            lease_ttl: DEFAULT_LEASE_TTL,
45        }
46    }
47}
48
49struct SqliteEffectReplayInner {
50    conn: SqliteConnection,
51    owner_id: String,
52    lease_counter: AtomicU64,
53    replay_mode: AtomicBool,
54    lease_ttl_ms: u64,
55}
56
57/// Deployment-level SQLite effect host.
58///
59/// This host persists runtime effect history in a local SQLite database and
60/// returns scoped controllers that replay completed outcomes by
61/// `(scope_id, replay_key)`.
62#[derive(Clone)]
63pub struct SqliteEffectHost {
64    inner: Arc<SqliteEffectReplayInner>,
65}
66
67/// Scoped SQLite-backed runtime effect controller.
68#[derive(Clone)]
69pub struct SqliteRuntimeEffectController {
70    inner: Arc<SqliteEffectReplayInner>,
71    scope: EffectScope,
72}
73
74struct ClaimedEffect {
75    scope_id: String,
76    replay_key: String,
77    envelope_hash: String,
78    lease_token: String,
79    due_at_ms: Option<u64>,
80}
81
82enum PreparedEffect {
83    ReplayOutcome {
84        outcome: Box<RuntimeEffectOutcome>,
85        due_at_ms: Option<u64>,
86    },
87    ReplayError(RuntimeEffectControllerError),
88    Claimed(ClaimedEffect),
89    Busy {
90        retry_at_ms: u64,
91    },
92}
93
94impl SqliteEffectHost {
95    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
96        Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
97    }
98
99    pub async fn open_with_options(
100        path: &Path,
101        options: SqliteEffectReplayOptions,
102    ) -> tokio_rusqlite::Result<Self> {
103        Ok(Self {
104            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
105        })
106    }
107
108    pub async fn memory() -> tokio_rusqlite::Result<Self> {
109        Self::memory_with_options(SqliteEffectReplayOptions::default()).await
110    }
111
112    pub async fn memory_with_options(
113        options: SqliteEffectReplayOptions,
114    ) -> tokio_rusqlite::Result<Self> {
115        Ok(Self {
116            inner: open_effect_replay_memory_inner(options).await?,
117        })
118    }
119
120    /// Force strict replay mode: missing effect history fails instead of
121    /// executing locally. Normal operation still replays any completed row.
122    pub fn start_replay(&self) {
123        self.inner.replay_mode.store(true, Ordering::SeqCst);
124    }
125}
126
127impl EffectHost for SqliteEffectHost {
128    fn durability_tier(&self) -> DurabilityTier {
129        DurabilityTier::Durable
130    }
131
132    fn requires_durable_attachment_store(&self) -> bool {
133        true
134    }
135
136    fn scoped<'run>(
137        &'run self,
138        scope: EffectScope,
139    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
140        let controller = SqliteRuntimeEffectController {
141            inner: Arc::clone(&self.inner),
142            scope: scope.clone(),
143        };
144        ScopedEffectController::shared(Arc::new(controller), scope)
145    }
146
147    fn scoped_static(
148        &self,
149        scope: EffectScope,
150    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
151        let controller = SqliteRuntimeEffectController {
152            inner: Arc::clone(&self.inner),
153            scope: scope.clone(),
154        };
155        Ok(Some(ScopedEffectController::shared(
156            Arc::new(controller),
157            scope,
158        )?))
159    }
160}
161
162impl SqliteRuntimeEffectController {
163    pub async fn open(path: &Path, scope: EffectScope) -> tokio_rusqlite::Result<Self> {
164        Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
165    }
166
167    pub async fn open_with_options(
168        path: &Path,
169        scope: EffectScope,
170        options: SqliteEffectReplayOptions,
171    ) -> tokio_rusqlite::Result<Self> {
172        Ok(Self {
173            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
174            scope,
175        })
176    }
177
178    pub async fn memory(scope: EffectScope) -> tokio_rusqlite::Result<Self> {
179        Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
180    }
181
182    pub async fn memory_with_options(
183        scope: EffectScope,
184        options: SqliteEffectReplayOptions,
185    ) -> tokio_rusqlite::Result<Self> {
186        Ok(Self {
187            inner: open_effect_replay_memory_inner(options).await?,
188            scope,
189        })
190    }
191
192    /// Force strict replay mode: missing effect history fails instead of
193    /// executing locally. Normal operation still replays any completed row.
194    pub fn start_replay(&self) {
195        self.inner.replay_mode.store(true, Ordering::SeqCst);
196    }
197
198    async fn prepare_effect(
199        &self,
200        envelope: &RuntimeEffectEnvelope,
201    ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
202        let replay_key = envelope
203            .invocation
204            .replay_key()
205            .ok_or_else(|| {
206                RuntimeEffectControllerError::new(
207                    "sqlite_effect_replay_key_missing",
208                    "runtime effect envelope requires replay.key",
209                )
210            })?
211            .to_string();
212        let envelope_hash = envelope.stable_hash()?;
213        let scope_id = self.scope.id().to_string();
214        let now = current_epoch_ms();
215        let lease_token = self.inner.next_lease_token();
216        let due_at_ms = sleep_due_at_ms(envelope, now);
217        let lease_expires_at_ms = now.saturating_add(self.inner.lease_ttl_ms);
218        let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
219        let owner_id = self.inner.owner_id.clone();
220        let lease_ttl_ms = self.inner.lease_ttl_ms;
221
222        // The `BEGIN IMMEDIATE` transaction is run on the connection thread via
223        // `conn.write`. The closure returns a `rusqlite::Result` carrying our
224        // own outcome `Result<PreparedEffect, RuntimeEffectControllerError>`:
225        // the SQL committing is independent of whether the recorded effect was
226        // a success, a failure, or a fresh claim — exactly the prior behaviour
227        // (commit on `Ok(_)` for any `PreparedEffect` variant). Only a real
228        // SQLite error rolls back.
229        let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
230            .inner
231            .conn
232            .write(move |tx| {
233                let row = tx
234                    .query_row(
235                        "SELECT envelope_hash, status, outcome_json, error_json,
236                                lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
237                         FROM runtime_effect_replay
238                         WHERE scope_id = ?1 AND replay_key = ?2",
239                        params![scope_id.as_str(), replay_key.as_str()],
240                        |row| {
241                            Ok((
242                                row.get::<_, String>(0)?,
243                                row.get::<_, String>(1)?,
244                                row.get::<_, Option<String>>(2)?,
245                                row.get::<_, Option<String>>(3)?,
246                                row.get::<_, i64>(6)?,
247                                row.get::<_, Option<i64>>(7)?,
248                            ))
249                        },
250                    )
251                    .optional()?;
252
253                let Some((
254                    existing_hash,
255                    status,
256                    outcome_json,
257                    error_json,
258                    lease_expires_row,
259                    existing_due_row,
260                )) = row
261                else {
262                    if replay_mode {
263                        return Ok(Err(RuntimeEffectControllerError::new(
264                            "sqlite_effect_replay_missing",
265                            format!(
266                                "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
267                            ),
268                        )));
269                    }
270                    let due_at_param = due_at_ms.map(|value| value as i64);
271                    tx.execute(
272                        "INSERT INTO runtime_effect_replay (
273                            scope_id, replay_key, envelope_hash, status, outcome_json,
274                            error_json, lease_owner_id, lease_token, lease_expires_at_ms,
275                            due_at_ms, created_at_ms, updated_at_ms
276                         )
277                         VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
278                        params![
279                            scope_id.as_str(),
280                            replay_key.as_str(),
281                            envelope_hash.as_str(),
282                            STATUS_IN_PROGRESS,
283                            owner_id.as_str(),
284                            lease_token.as_str(),
285                            lease_expires_at_ms as i64,
286                            due_at_param,
287                            now as i64,
288                            now as i64,
289                        ],
290                    )?;
291                    return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
292                        scope_id,
293                        replay_key,
294                        envelope_hash,
295                        lease_token,
296                        due_at_ms,
297                    })));
298                };
299
300                if existing_hash != envelope_hash {
301                    return Ok(Err(RuntimeEffectControllerError::new(
302                        "sqlite_effect_replay_hash_conflict",
303                        format!(
304                            "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
305                        ),
306                    )));
307                }
308
309                let lease_expires_at_ms = lease_expires_row as u64;
310                let existing_due_at_ms = existing_due_row.map(|value| value as u64);
311
312                match status.as_str() {
313                    STATUS_COMPLETED => {
314                        let Some(json) = outcome_json else {
315                            return Ok(Err(RuntimeEffectControllerError::new(
316                                "sqlite_effect_replay_corrupt_row",
317                                "completed runtime effect row is missing outcome_json",
318                            )));
319                        };
320                        let outcome = match serde_json::from_str(&json) {
321                            Ok(outcome) => outcome,
322                            Err(err) => return Ok(Err(effect_decode_error(err))),
323                        };
324                        Ok(Ok(PreparedEffect::ReplayOutcome {
325                            outcome: Box::new(outcome),
326                            due_at_ms: existing_due_at_ms,
327                        }))
328                    }
329                    STATUS_FAILED => {
330                        let Some(json) = error_json else {
331                            return Ok(Err(RuntimeEffectControllerError::new(
332                                "sqlite_effect_replay_corrupt_row",
333                                "failed runtime effect row is missing error_json",
334                            )));
335                        };
336                        let err = match serde_json::from_str(&json) {
337                            Ok(err) => err,
338                            Err(err) => return Ok(Err(effect_decode_error(err))),
339                        };
340                        Ok(Ok(PreparedEffect::ReplayError(err)))
341                    }
342                    STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
343                        retry_at_ms: lease_expires_at_ms,
344                    })),
345                    STATUS_IN_PROGRESS => {
346                        let due_at_ms = existing_due_at_ms.or(due_at_ms);
347                        let due_at_param = due_at_ms.map(|value| value as i64);
348                        tx.execute(
349                            "UPDATE runtime_effect_replay
350                             SET lease_owner_id = ?3,
351                                 lease_token = ?4,
352                                 lease_expires_at_ms = ?5,
353                                 due_at_ms = ?6,
354                                 updated_at_ms = ?7
355                             WHERE scope_id = ?1 AND replay_key = ?2",
356                            params![
357                                scope_id.as_str(),
358                                replay_key.as_str(),
359                                owner_id.as_str(),
360                                lease_token.as_str(),
361                                current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
362                                due_at_param,
363                                current_epoch_ms() as i64,
364                            ],
365                        )?;
366                        Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
367                            scope_id,
368                            replay_key,
369                            envelope_hash,
370                            lease_token,
371                            due_at_ms,
372                        })))
373                    }
374                    other => Ok(Err(RuntimeEffectControllerError::new(
375                        "sqlite_effect_replay_corrupt_row",
376                        format!("unknown runtime effect replay status `{other}`"),
377                    ))),
378                }
379            })
380            .await
381            .map_err(effect_sqlite_error)?;
382        outcome
383    }
384
385    async fn finalize_effect(
386        &self,
387        claim: &ClaimedEffect,
388        outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
389    ) -> Result<(), RuntimeEffectControllerError> {
390        let (status, outcome_json, error_json) = match outcome {
391            Ok(outcome) => (
392                STATUS_COMPLETED,
393                Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
394                None,
395            ),
396            Err(err) => (
397                STATUS_FAILED,
398                None,
399                Some(serde_json::to_string(err).map_err(effect_encode_error)?),
400            ),
401        };
402        let now = current_epoch_ms();
403        let scope_id = claim.scope_id.clone();
404        let replay_key = claim.replay_key.clone();
405        let envelope_hash = claim.envelope_hash.clone();
406        let lease_token = claim.lease_token.clone();
407        let status = status.to_string();
408
409        let result: Result<(), RuntimeEffectControllerError> = self
410            .inner
411            .conn
412            .write(move |tx| {
413                let changed = tx.execute(
414                    "UPDATE runtime_effect_replay
415                     SET status = ?5,
416                         outcome_json = ?6,
417                         error_json = ?7,
418                         lease_owner_id = NULL,
419                         lease_token = NULL,
420                         lease_expires_at_ms = 0,
421                         updated_at_ms = ?8
422                     WHERE scope_id = ?1
423                       AND replay_key = ?2
424                       AND envelope_hash = ?3
425                       AND lease_token = ?4
426                       AND status = 'in_progress'",
427                    params![
428                        scope_id.as_str(),
429                        replay_key.as_str(),
430                        envelope_hash.as_str(),
431                        lease_token.as_str(),
432                        status.as_str(),
433                        outcome_json,
434                        error_json,
435                        now as i64,
436                    ],
437                )?;
438                if changed != 1 {
439                    return Ok(Err(RuntimeEffectControllerError::new(
440                        "sqlite_effect_replay_lease_lost",
441                        format!(
442                            "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
443                        ),
444                    )));
445                }
446                Ok(Ok(()))
447            })
448            .await
449            .map_err(effect_sqlite_error)?;
450        result
451    }
452
453    async fn execute_claimed_effect(
454        &self,
455        claim: &ClaimedEffect,
456        envelope: RuntimeEffectEnvelope,
457        local_executor: RuntimeEffectLocalExecutor<'_>,
458    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
459        if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
460            sleep_until_due(claim.due_at_ms).await;
461            return Ok(RuntimeEffectOutcome::Sleep);
462        }
463        match envelope.command {
464            RuntimeEffectCommand::Process { command } => {
465                let result = self
466                    .execute_process_command(command, local_executor)
467                    .await?;
468                Ok(RuntimeEffectOutcome::Process { result })
469            }
470            _ => local_executor.execute(envelope).await,
471        }
472    }
473
474    async fn execute_process_command(
475        &self,
476        command: ProcessCommand,
477        local_executor: RuntimeEffectLocalExecutor<'_>,
478    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
479        let execution = local_executor.into_process()?;
480        let registry = execution.registry;
481        match command {
482            ProcessCommand::Start {
483                registration,
484                grant,
485                execution_context: _,
486            } => {
487                let registration_id = registration.id.clone();
488                let record = registry.register_process(registration).await?;
489                if let Some(grant) = grant {
490                    registry
491                        .grant_handle(&grant.owner_scope, &registration_id, grant.descriptor)
492                        .await?;
493                }
494                Ok(ProcessEffectOutcome::Start { record })
495            }
496            ProcessCommand::List { owner_scope, mode } => {
497                let entries = match mode {
498                    lash_core::ProcessListMode::Live => {
499                        registry.list_live_handle_grants(&owner_scope).await?
500                    }
501                    lash_core::ProcessListMode::All => {
502                        registry.list_handle_grants(&owner_scope).await?
503                    }
504                };
505                Ok(ProcessEffectOutcome::List { entries })
506            }
507            ProcessCommand::Transfer {
508                from_scope,
509                to_scope,
510                process_ids,
511            } => {
512                registry
513                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
514                    .await?;
515                Ok(ProcessEffectOutcome::Transfer)
516            }
517            ProcessCommand::DeleteSession { session_id } => {
518                let report = registry.delete_session_process_state(&session_id).await?;
519                for process_id in &report.cancel_process_ids {
520                    registry
521                        .append_event(
522                            process_id,
523                            lash_core::ProcessEventAppendRequest::cancel_requested(
524                                process_id,
525                                Some("session deleted".to_string()),
526                            ),
527                        )
528                        .await?;
529                }
530                Ok(ProcessEffectOutcome::DeleteSession { report })
531            }
532            ProcessCommand::Await { process_id } => {
533                let output = registry.await_process(&process_id).await?;
534                Ok(ProcessEffectOutcome::Await { output })
535            }
536            ProcessCommand::Cancel { process_id, reason } => {
537                registry
538                    .append_event(
539                        &process_id,
540                        lash_core::ProcessEventAppendRequest::cancel_requested(&process_id, reason),
541                    )
542                    .await?;
543                let record = registry.get_process(&process_id).await.ok_or_else(|| {
544                    PluginError::Session(format!("unknown process `{process_id}`"))
545                })?;
546                Ok(ProcessEffectOutcome::Cancel { record })
547            }
548            ProcessCommand::Signal {
549                process_id,
550                request,
551                ..
552            } => {
553                let result = registry.append_event(&process_id, request).await?;
554                Ok(ProcessEffectOutcome::Signal {
555                    event: result.event,
556                })
557            }
558        }
559    }
560}
561
562#[async_trait::async_trait]
563impl RuntimeEffectController for SqliteRuntimeEffectController {
564    fn durability_tier(&self) -> DurabilityTier {
565        DurabilityTier::Durable
566    }
567
568    fn requires_durable_attachment_store(&self) -> bool {
569        true
570    }
571
572    async fn execute_effect(
573        &self,
574        envelope: RuntimeEffectEnvelope,
575        local_executor: RuntimeEffectLocalExecutor<'_>,
576    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
577        loop {
578            match self.prepare_effect(&envelope).await? {
579                PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
580                    sleep_until_due(due_at_ms).await;
581                    return Ok(*outcome);
582                }
583                PreparedEffect::ReplayError(err) => return Err(err),
584                PreparedEffect::Claimed(claim) => {
585                    let result = self
586                        .execute_claimed_effect(&claim, envelope, local_executor)
587                        .await;
588                    let finalize = self.finalize_effect(&claim, &result).await;
589                    return match (result, finalize) {
590                        (Ok(outcome), Ok(())) => Ok(outcome),
591                        (Err(err), Ok(())) => Err(err),
592                        (_, Err(err)) => Err(err),
593                    };
594                }
595                PreparedEffect::Busy { retry_at_ms } => {
596                    sleep_until_retry(retry_at_ms).await;
597                }
598            }
599        }
600    }
601}
602
603async fn open_effect_replay_inner(
604    path: &Path,
605    backing: StoreBacking,
606    options: SqliteEffectReplayOptions,
607) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
608    let conn = SqliteConnection::open(path).await?;
609    ensure_effect_schema(&conn).await?;
610    apply_pragmas(&conn, backing).await?;
611    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
612}
613
614async fn open_effect_replay_memory_inner(
615    options: SqliteEffectReplayOptions,
616) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
617    let conn = SqliteConnection::open_in_memory().await?;
618    ensure_effect_schema(&conn).await?;
619    apply_pragmas(&conn, StoreBacking::Memory).await?;
620    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
621}
622
623impl SqliteEffectReplayInner {
624    fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
625        let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
626        Self {
627            conn,
628            owner_id: format!(
629                "pid{}-{sequence}-{}",
630                std::process::id(),
631                current_epoch_ms()
632            ),
633            lease_counter: AtomicU64::new(1),
634            replay_mode: AtomicBool::new(false),
635            lease_ttl_ms: duration_ms(options.lease_ttl),
636        }
637    }
638
639    fn next_lease_token(&self) -> String {
640        let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
641        format!("{}:{sequence}", self.owner_id)
642    }
643}
644
645fn duration_ms(duration: Duration) -> u64 {
646    let millis = duration.as_millis();
647    if millis == 0 {
648        1
649    } else {
650        millis.min(u128::from(u64::MAX)) as u64
651    }
652}
653
654fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
655    match envelope.command {
656        RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
657        _ => None,
658    }
659}
660
661async fn sleep_until_due(due_at_ms: Option<u64>) {
662    let Some(due_at_ms) = due_at_ms else {
663        return;
664    };
665    let now = current_epoch_ms();
666    if due_at_ms > now {
667        tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
668    }
669}
670
671async fn sleep_until_retry(retry_at_ms: u64) {
672    let now = current_epoch_ms();
673    let delay = if retry_at_ms > now {
674        Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
675    } else {
676        BUSY_POLL
677    };
678    tokio::time::sleep(delay).await;
679}
680
681fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
682    RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
683}
684
685fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
686    RuntimeEffectControllerError::new(
687        "sqlite_effect_replay_encode",
688        format!("failed to encode runtime effect replay row: {err}"),
689    )
690}
691
692fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
693    RuntimeEffectControllerError::new(
694        "sqlite_effect_replay_decode",
695        format!("failed to decode runtime effect replay row: {err}"),
696    )
697}