Skip to main content

lash_sqlite_store/
effect_replay.rs

1//! SQLite-backed runtime effect replay host (tokio-rusqlite port of the the prior store
2//! store's `effect_replay`).
3//!
4//! The public surface is identical to the prior implementation — same type names
5//! (`SqliteEffectHost`, `SqliteRuntimeEffectController`, `SqliteEffectReplayOptions`),
6//! same async signatures — so consumers swap backends with a path rename only.
7//! The only mechanical change is the database layer: the prior store ran every op directly
8//! on `&Connection` with `.await`; here every database body is a *synchronous*
9//! rusqlite closure handed to the shared [`SqliteConnection`] wrapper. The
10//! claim/finalize paths run inside `conn.write` (`BEGIN IMMEDIATE`) so the
11//! cross-process write lock is taken up front — the lease fence WAL gives us.
12
13use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19    AwaitEventResolver, DurabilityTier, EffectHost, ExecutionScope, LeaseTimings,
20    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21    RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22    ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const BUSY_POLL: Duration = Duration::from_millis(25);
31
32static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
33
34/// Options for SQLite-backed runtime effect replay.
35#[derive(Clone, Debug, Default)]
36pub struct SqliteEffectReplayOptions {
37    /// Effect-replay lease timing capability. Hosts share the same
38    /// [`LeaseTimings`] they configure on the runtime so effect leases expire
39    /// on the same failover window as session and process leases.
40    pub lease_timings: LeaseTimings,
41}
42
43struct SqliteEffectReplayInner {
44    conn: SqliteConnection,
45    owner_id: String,
46    lease_counter: AtomicU64,
47    replay_mode: AtomicBool,
48    lease_timings: LeaseTimings,
49}
50
51/// Deployment-level SQLite effect host.
52///
53/// This host persists runtime effect history in a local SQLite database and
54/// returns scoped controllers that replay completed outcomes by
55/// `(scope_id, replay_key)`.
56#[derive(Clone)]
57pub struct SqliteEffectHost {
58    inner: Arc<SqliteEffectReplayInner>,
59}
60
61/// Scoped SQLite-backed runtime effect controller.
62#[derive(Clone)]
63pub struct SqliteRuntimeEffectController {
64    inner: Arc<SqliteEffectReplayInner>,
65    scope: ExecutionScope,
66}
67
68struct ClaimedEffect {
69    scope_id: String,
70    replay_key: String,
71    envelope_hash: String,
72    lease_token: String,
73    due_at_ms: Option<u64>,
74}
75
76enum PreparedEffect {
77    ReplayOutcome {
78        outcome: Box<RuntimeEffectOutcome>,
79        due_at_ms: Option<u64>,
80    },
81    ReplayError(RuntimeEffectControllerError),
82    Claimed(ClaimedEffect),
83    Busy {
84        retry_at_ms: u64,
85    },
86}
87
88impl SqliteEffectHost {
89    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
90        Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
91    }
92
93    pub async fn open_with_options(
94        path: &Path,
95        options: SqliteEffectReplayOptions,
96    ) -> tokio_rusqlite::Result<Self> {
97        Ok(Self {
98            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
99        })
100    }
101
102    pub async fn memory() -> tokio_rusqlite::Result<Self> {
103        Self::memory_with_options(SqliteEffectReplayOptions::default()).await
104    }
105
106    pub async fn memory_with_options(
107        options: SqliteEffectReplayOptions,
108    ) -> tokio_rusqlite::Result<Self> {
109        Ok(Self {
110            inner: open_effect_replay_memory_inner(options).await?,
111        })
112    }
113
114    /// Force strict replay mode: missing effect history fails instead of
115    /// executing locally. Normal operation still replays any completed row.
116    pub fn start_replay(&self) {
117        self.inner.replay_mode.store(true, Ordering::SeqCst);
118    }
119}
120
121impl AwaitEventResolver for SqliteEffectHost {
122    fn durability_tier(&self) -> DurabilityTier {
123        DurabilityTier::Durable
124    }
125
126    fn requires_durable_attachment_store(&self) -> bool {
127        true
128    }
129}
130
131impl EffectHost for SqliteEffectHost {
132    fn scoped<'run>(
133        &'run self,
134        scope: ExecutionScope,
135    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
136        let controller = SqliteRuntimeEffectController {
137            inner: Arc::clone(&self.inner),
138            scope: scope.clone(),
139        };
140        ScopedEffectController::shared(Arc::new(controller), scope)
141    }
142
143    fn scoped_static(
144        &self,
145        scope: ExecutionScope,
146    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
147        let controller = SqliteRuntimeEffectController {
148            inner: Arc::clone(&self.inner),
149            scope: scope.clone(),
150        };
151        Ok(Some(ScopedEffectController::shared(
152            Arc::new(controller),
153            scope,
154        )?))
155    }
156}
157
158impl SqliteRuntimeEffectController {
159    pub async fn open(path: &Path, scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
160        Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
161    }
162
163    pub async fn open_with_options(
164        path: &Path,
165        scope: ExecutionScope,
166        options: SqliteEffectReplayOptions,
167    ) -> tokio_rusqlite::Result<Self> {
168        Ok(Self {
169            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
170            scope,
171        })
172    }
173
174    pub async fn memory(scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
175        Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
176    }
177
178    pub async fn memory_with_options(
179        scope: ExecutionScope,
180        options: SqliteEffectReplayOptions,
181    ) -> tokio_rusqlite::Result<Self> {
182        Ok(Self {
183            inner: open_effect_replay_memory_inner(options).await?,
184            scope,
185        })
186    }
187
188    /// Force strict replay mode: missing effect history fails instead of
189    /// executing locally. Normal operation still replays any completed row.
190    pub fn start_replay(&self) {
191        self.inner.replay_mode.store(true, Ordering::SeqCst);
192    }
193
194    async fn prepare_effect(
195        &self,
196        envelope: &RuntimeEffectEnvelope,
197    ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
198        let replay_key = envelope
199            .invocation
200            .replay_key()
201            .ok_or_else(|| {
202                RuntimeEffectControllerError::new(
203                    "sqlite_effect_replay_key_missing",
204                    "runtime effect envelope requires replay.key",
205                )
206            })?
207            .to_string();
208        let envelope_hash = envelope.stable_hash()?;
209        let scope_id = self.scope.id().to_string();
210        let now = current_epoch_ms();
211        let lease_token = self.inner.next_lease_token();
212        let due_at_ms = sleep_due_at_ms(envelope, now);
213        let lease_ttl_ms = self.inner.lease_timings.ttl_ms();
214        let lease_expires_at_ms = now.saturating_add(lease_ttl_ms);
215        let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
216        let owner_id = self.inner.owner_id.clone();
217
218        // The `BEGIN IMMEDIATE` transaction is run on the connection thread via
219        // `conn.write`. The closure returns a `rusqlite::Result` carrying our
220        // own outcome `Result<PreparedEffect, RuntimeEffectControllerError>`:
221        // the SQL committing is independent of whether the recorded effect was
222        // a success, a failure, or a fresh claim — exactly the prior behaviour
223        // (commit on `Ok(_)` for any `PreparedEffect` variant). Only a real
224        // SQLite error rolls back.
225        let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
226            .inner
227            .conn
228            .write(move |tx| {
229                let row = tx
230                    .query_row(
231                        "SELECT envelope_hash, status, outcome_json, error_json,
232                                lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
233                         FROM runtime_effect_replay
234                         WHERE scope_id = ?1 AND replay_key = ?2",
235                        params![scope_id.as_str(), replay_key.as_str()],
236                        |row| {
237                            Ok((
238                                row.get::<_, String>(0)?,
239                                row.get::<_, String>(1)?,
240                                row.get::<_, Option<String>>(2)?,
241                                row.get::<_, Option<String>>(3)?,
242                                row.get::<_, i64>(6)?,
243                                row.get::<_, Option<i64>>(7)?,
244                            ))
245                        },
246                    )
247                    .optional()?;
248
249                let Some((
250                    existing_hash,
251                    status,
252                    outcome_json,
253                    error_json,
254                    lease_expires_row,
255                    existing_due_row,
256                )) = row
257                else {
258                    if replay_mode {
259                        return Ok(Err(RuntimeEffectControllerError::new(
260                            "sqlite_effect_replay_missing",
261                            format!(
262                                "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
263                            ),
264                        )));
265                    }
266                    let due_at_param = due_at_ms.map(|value| value as i64);
267                    tx.execute(
268                        "INSERT INTO runtime_effect_replay (
269                            scope_id, replay_key, envelope_hash, status, outcome_json,
270                            error_json, lease_owner_id, lease_token, lease_expires_at_ms,
271                            due_at_ms, created_at_ms, updated_at_ms
272                         )
273                         VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
274                        params![
275                            scope_id.as_str(),
276                            replay_key.as_str(),
277                            envelope_hash.as_str(),
278                            STATUS_IN_PROGRESS,
279                            owner_id.as_str(),
280                            lease_token.as_str(),
281                            lease_expires_at_ms as i64,
282                            due_at_param,
283                            now as i64,
284                            now as i64,
285                        ],
286                    )?;
287                    return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
288                        scope_id,
289                        replay_key,
290                        envelope_hash,
291                        lease_token,
292                        due_at_ms,
293                    })));
294                };
295
296                if existing_hash != envelope_hash {
297                    return Ok(Err(RuntimeEffectControllerError::new(
298                        "sqlite_effect_replay_hash_conflict",
299                        format!(
300                            "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
301                        ),
302                    )));
303                }
304
305                let lease_expires_at_ms = lease_expires_row as u64;
306                let existing_due_at_ms = existing_due_row.map(|value| value as u64);
307
308                match status.as_str() {
309                    STATUS_COMPLETED => {
310                        let Some(json) = outcome_json else {
311                            return Ok(Err(RuntimeEffectControllerError::new(
312                                "sqlite_effect_replay_corrupt_row",
313                                "completed runtime effect row is missing outcome_json",
314                            )));
315                        };
316                        let outcome = match serde_json::from_str(&json) {
317                            Ok(outcome) => outcome,
318                            Err(err) => return Ok(Err(effect_decode_error(err))),
319                        };
320                        Ok(Ok(PreparedEffect::ReplayOutcome {
321                            outcome: Box::new(outcome),
322                            due_at_ms: existing_due_at_ms,
323                        }))
324                    }
325                    STATUS_FAILED => {
326                        let Some(json) = error_json else {
327                            return Ok(Err(RuntimeEffectControllerError::new(
328                                "sqlite_effect_replay_corrupt_row",
329                                "failed runtime effect row is missing error_json",
330                            )));
331                        };
332                        let err = match serde_json::from_str(&json) {
333                            Ok(err) => err,
334                            Err(err) => return Ok(Err(effect_decode_error(err))),
335                        };
336                        Ok(Ok(PreparedEffect::ReplayError(err)))
337                    }
338                    STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
339                        retry_at_ms: lease_expires_at_ms,
340                    })),
341                    STATUS_IN_PROGRESS => {
342                        let due_at_ms = existing_due_at_ms.or(due_at_ms);
343                        let due_at_param = due_at_ms.map(|value| value as i64);
344                        tx.execute(
345                            "UPDATE runtime_effect_replay
346                             SET lease_owner_id = ?3,
347                                 lease_token = ?4,
348                                 lease_expires_at_ms = ?5,
349                                 due_at_ms = ?6,
350                                 updated_at_ms = ?7
351                             WHERE scope_id = ?1 AND replay_key = ?2",
352                            params![
353                                scope_id.as_str(),
354                                replay_key.as_str(),
355                                owner_id.as_str(),
356                                lease_token.as_str(),
357                                current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
358                                due_at_param,
359                                current_epoch_ms() as i64,
360                            ],
361                        )?;
362                        Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
363                            scope_id,
364                            replay_key,
365                            envelope_hash,
366                            lease_token,
367                            due_at_ms,
368                        })))
369                    }
370                    other => Ok(Err(RuntimeEffectControllerError::new(
371                        "sqlite_effect_replay_corrupt_row",
372                        format!("unknown runtime effect replay status `{other}`"),
373                    ))),
374                }
375            })
376            .await
377            .map_err(effect_sqlite_error)?;
378        outcome
379    }
380
381    async fn finalize_effect(
382        &self,
383        claim: &ClaimedEffect,
384        outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
385    ) -> Result<(), RuntimeEffectControllerError> {
386        let (status, outcome_json, error_json) = match outcome {
387            Ok(outcome) => (
388                STATUS_COMPLETED,
389                Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
390                None,
391            ),
392            Err(err) => (
393                STATUS_FAILED,
394                None,
395                Some(serde_json::to_string(err).map_err(effect_encode_error)?),
396            ),
397        };
398        let now = current_epoch_ms();
399        let scope_id = claim.scope_id.clone();
400        let replay_key = claim.replay_key.clone();
401        let envelope_hash = claim.envelope_hash.clone();
402        let owner_id = self.inner.owner_id.clone();
403        let lease_token = claim.lease_token.clone();
404        let status = status.to_string();
405
406        let result: Result<(), RuntimeEffectControllerError> = self
407            .inner
408            .conn
409            .write(move |tx| {
410                let changed = tx.execute(
411                    "UPDATE runtime_effect_replay
412                     SET status = ?6,
413                         outcome_json = ?7,
414                         error_json = ?8,
415                         lease_owner_id = NULL,
416                         lease_token = NULL,
417                         lease_expires_at_ms = 0,
418                         updated_at_ms = ?9
419                     WHERE scope_id = ?1
420                       AND replay_key = ?2
421                       AND envelope_hash = ?3
422                       AND lease_owner_id = ?4
423                       AND lease_token = ?5
424                       AND status = 'in_progress'
425                       AND lease_expires_at_ms > ?10",
426                    params![
427                        scope_id.as_str(),
428                        replay_key.as_str(),
429                        envelope_hash.as_str(),
430                        owner_id.as_str(),
431                        lease_token.as_str(),
432                        status.as_str(),
433                        outcome_json,
434                        error_json,
435                        now as i64,
436                        now as i64,
437                    ],
438                )?;
439                if changed != 1 {
440                    return Ok(Err(RuntimeEffectControllerError::new(
441                        "sqlite_effect_replay_lease_lost",
442                        format!(
443                            "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
444                        ),
445                    )));
446                }
447                Ok(Ok(()))
448            })
449            .await
450            .map_err(effect_sqlite_error)?;
451        result
452    }
453
454    async fn renew_effect_lease(
455        &self,
456        claim: &ClaimedEffect,
457    ) -> Result<(), RuntimeEffectControllerError> {
458        let now = current_epoch_ms();
459        let renewed_expires_at = now.saturating_add(self.inner.lease_timings.ttl_ms());
460        let scope_id = claim.scope_id.clone();
461        let replay_key = claim.replay_key.clone();
462        let envelope_hash = claim.envelope_hash.clone();
463        let owner_id = self.inner.owner_id.clone();
464        let lease_token = claim.lease_token.clone();
465
466        let result: Result<(), RuntimeEffectControllerError> = self
467            .inner
468            .conn
469            .write(move |tx| {
470                let changed = tx.execute(
471                    "UPDATE runtime_effect_replay
472                     SET lease_expires_at_ms = ?6,
473                         updated_at_ms = ?7
474                     WHERE scope_id = ?1
475                       AND replay_key = ?2
476                       AND envelope_hash = ?3
477                       AND lease_owner_id = ?4
478                       AND lease_token = ?5
479                       AND status = 'in_progress'
480                       AND lease_expires_at_ms > ?8",
481                    params![
482                        scope_id.as_str(),
483                        replay_key.as_str(),
484                        envelope_hash.as_str(),
485                        owner_id.as_str(),
486                        lease_token.as_str(),
487                        renewed_expires_at as i64,
488                        now as i64,
489                        now as i64,
490                    ],
491                )?;
492                if changed != 1 {
493                    return Ok(Err(RuntimeEffectControllerError::new(
494                        "sqlite_effect_replay_lease_lost",
495                        format!(
496                            "runtime effect replay lease was lost while executing scope `{scope_id}` replay key `{replay_key}`"
497                        ),
498                    )));
499                }
500                Ok(Ok(()))
501            })
502            .await
503            .map_err(effect_sqlite_error)?;
504        result
505    }
506
507    async fn execute_claimed_effect_with_renewal(
508        &self,
509        claim: &ClaimedEffect,
510        envelope: RuntimeEffectEnvelope,
511        local_executor: RuntimeEffectLocalExecutor<'_>,
512    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
513        let renew_every = self.inner.lease_timings.renew_interval();
514        let effect = self.execute_claimed_effect(claim, envelope, local_executor);
515        tokio::pin!(effect);
516        let renew_sleep = tokio::time::sleep(renew_every);
517        tokio::pin!(renew_sleep);
518
519        loop {
520            tokio::select! {
521                result = &mut effect => return result,
522                _ = &mut renew_sleep => {
523                    self.renew_effect_lease(claim).await?;
524                    renew_sleep.as_mut().reset(tokio::time::Instant::now() + renew_every);
525                }
526            }
527        }
528    }
529
530    async fn execute_claimed_effect(
531        &self,
532        claim: &ClaimedEffect,
533        envelope: RuntimeEffectEnvelope,
534        local_executor: RuntimeEffectLocalExecutor<'_>,
535    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
536        if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
537            sleep_until_due(claim.due_at_ms).await;
538            return Ok(RuntimeEffectOutcome::Sleep);
539        }
540        match envelope.command {
541            RuntimeEffectCommand::Process { command } => {
542                let result = local_executor.into_process()?.execute(*command).await?;
543                Ok(RuntimeEffectOutcome::Process { result })
544            }
545            _ => local_executor.execute(envelope).await,
546        }
547    }
548}
549
550impl AwaitEventResolver for SqliteRuntimeEffectController {
551    fn durability_tier(&self) -> DurabilityTier {
552        DurabilityTier::Durable
553    }
554
555    fn requires_durable_attachment_store(&self) -> bool {
556        true
557    }
558
559    fn supports_durable_effects(&self) -> bool {
560        true
561    }
562}
563
564#[async_trait::async_trait]
565impl RuntimeEffectController for SqliteRuntimeEffectController {
566    async fn execute_effect(
567        &self,
568        envelope: RuntimeEffectEnvelope,
569        local_executor: RuntimeEffectLocalExecutor<'_>,
570    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
571        loop {
572            match self.prepare_effect(&envelope).await? {
573                PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
574                    sleep_until_due(due_at_ms).await;
575                    return Ok(*outcome);
576                }
577                PreparedEffect::ReplayError(err) => return Err(err),
578                PreparedEffect::Claimed(claim) => {
579                    let result = self
580                        .execute_claimed_effect_with_renewal(&claim, envelope, local_executor)
581                        .await;
582                    let finalize = self.finalize_effect(&claim, &result).await;
583                    return match (result, finalize) {
584                        (Ok(outcome), Ok(())) => Ok(outcome),
585                        (Err(err), Ok(())) => Err(err),
586                        (_, Err(err)) => Err(err),
587                    };
588                }
589                PreparedEffect::Busy { retry_at_ms } => {
590                    sleep_until_retry(retry_at_ms).await;
591                }
592            }
593        }
594    }
595}
596
597async fn open_effect_replay_inner(
598    path: &Path,
599    backing: StoreBacking,
600    options: SqliteEffectReplayOptions,
601) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
602    let conn = SqliteConnection::open(path).await?;
603    ensure_effect_schema(&conn).await?;
604    apply_pragmas(&conn, backing).await?;
605    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
606}
607
608async fn open_effect_replay_memory_inner(
609    options: SqliteEffectReplayOptions,
610) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
611    let conn = SqliteConnection::open_in_memory().await?;
612    ensure_effect_schema(&conn).await?;
613    apply_pragmas(&conn, StoreBacking::Memory).await?;
614    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
615}
616
617impl SqliteEffectReplayInner {
618    fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
619        let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
620        Self {
621            conn,
622            owner_id: format!(
623                "pid{}-{sequence}-{}",
624                std::process::id(),
625                current_epoch_ms()
626            ),
627            lease_counter: AtomicU64::new(1),
628            replay_mode: AtomicBool::new(false),
629            lease_timings: options.lease_timings,
630        }
631    }
632
633    fn next_lease_token(&self) -> String {
634        let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
635        format!("{}:{sequence}", self.owner_id)
636    }
637}
638
639fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
640    match envelope.command {
641        RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
642        _ => None,
643    }
644}
645
646async fn sleep_until_due(due_at_ms: Option<u64>) {
647    let Some(due_at_ms) = due_at_ms else {
648        return;
649    };
650    let now = current_epoch_ms();
651    if due_at_ms > now {
652        tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
653    }
654}
655
656async fn sleep_until_retry(retry_at_ms: u64) {
657    let now = current_epoch_ms();
658    let delay = if retry_at_ms > now {
659        Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
660    } else {
661        BUSY_POLL
662    };
663    tokio::time::sleep(delay).await;
664}
665
666fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
667    RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
668}
669
670fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
671    RuntimeEffectControllerError::new(
672        "sqlite_effect_replay_encode",
673        format!("failed to encode runtime effect replay row: {err}"),
674    )
675}
676
677fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
678    RuntimeEffectControllerError::new(
679        "sqlite_effect_replay_decode",
680        format!("failed to decode runtime effect replay row: {err}"),
681    )
682}