Skip to main content

lash_sqlite_store/
effect_replay.rs

1//! SQLite-backed runtime effect replay host (tokio-rusqlite port of the the prior store
2//! store's `effect_replay`).
3//!
4//! The public surface is identical to the prior implementation — same type names
5//! (`SqliteEffectHost`, `SqliteRuntimeEffectController`, `SqliteEffectReplayOptions`),
6//! same async signatures — so consumers swap backends with a path rename only.
7//! The only mechanical change is the database layer: the prior store ran every op directly
8//! on `&Connection` with `.await`; here every database body is a *synchronous*
9//! rusqlite closure handed to the shared [`SqliteConnection`] wrapper. The
10//! claim/finalize paths run inside `conn.write` (`BEGIN IMMEDIATE`) so the
11//! cross-process write lock is taken up front — the lease fence WAL gives us.
12
13use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19    DurabilityTier, EffectHost, ExecutionScope, PluginError, ProcessCommand, ProcessEffectOutcome,
20    RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21    RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22    ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(30);
31const BUSY_POLL: Duration = Duration::from_millis(25);
32
33static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
34
35/// Options for SQLite-backed runtime effect replay.
36#[derive(Clone, Debug)]
37pub struct SqliteEffectReplayOptions {
38    pub lease_ttl: Duration,
39}
40
41impl Default for SqliteEffectReplayOptions {
42    fn default() -> Self {
43        Self {
44            lease_ttl: DEFAULT_LEASE_TTL,
45        }
46    }
47}
48
49struct SqliteEffectReplayInner {
50    conn: SqliteConnection,
51    owner_id: String,
52    lease_counter: AtomicU64,
53    replay_mode: AtomicBool,
54    lease_ttl_ms: u64,
55}
56
57/// Deployment-level SQLite effect host.
58///
59/// This host persists runtime effect history in a local SQLite database and
60/// returns scoped controllers that replay completed outcomes by
61/// `(scope_id, replay_key)`.
62#[derive(Clone)]
63pub struct SqliteEffectHost {
64    inner: Arc<SqliteEffectReplayInner>,
65}
66
67/// Scoped SQLite-backed runtime effect controller.
68#[derive(Clone)]
69pub struct SqliteRuntimeEffectController {
70    inner: Arc<SqliteEffectReplayInner>,
71    scope: ExecutionScope,
72}
73
74struct ClaimedEffect {
75    scope_id: String,
76    replay_key: String,
77    envelope_hash: String,
78    lease_token: String,
79    due_at_ms: Option<u64>,
80}
81
82enum PreparedEffect {
83    ReplayOutcome {
84        outcome: Box<RuntimeEffectOutcome>,
85        due_at_ms: Option<u64>,
86    },
87    ReplayError(RuntimeEffectControllerError),
88    Claimed(ClaimedEffect),
89    Busy {
90        retry_at_ms: u64,
91    },
92}
93
94impl SqliteEffectHost {
95    pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
96        Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
97    }
98
99    pub async fn open_with_options(
100        path: &Path,
101        options: SqliteEffectReplayOptions,
102    ) -> tokio_rusqlite::Result<Self> {
103        Ok(Self {
104            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
105        })
106    }
107
108    pub async fn memory() -> tokio_rusqlite::Result<Self> {
109        Self::memory_with_options(SqliteEffectReplayOptions::default()).await
110    }
111
112    pub async fn memory_with_options(
113        options: SqliteEffectReplayOptions,
114    ) -> tokio_rusqlite::Result<Self> {
115        Ok(Self {
116            inner: open_effect_replay_memory_inner(options).await?,
117        })
118    }
119
120    /// Force strict replay mode: missing effect history fails instead of
121    /// executing locally. Normal operation still replays any completed row.
122    pub fn start_replay(&self) {
123        self.inner.replay_mode.store(true, Ordering::SeqCst);
124    }
125}
126
127impl EffectHost for SqliteEffectHost {
128    fn durability_tier(&self) -> DurabilityTier {
129        DurabilityTier::Durable
130    }
131
132    fn requires_durable_attachment_store(&self) -> bool {
133        true
134    }
135
136    fn scoped<'run>(
137        &'run self,
138        scope: ExecutionScope,
139    ) -> Result<ScopedEffectController<'run>, RuntimeError> {
140        let controller = SqliteRuntimeEffectController {
141            inner: Arc::clone(&self.inner),
142            scope: scope.clone(),
143        };
144        ScopedEffectController::shared(Arc::new(controller), scope)
145    }
146
147    fn scoped_static(
148        &self,
149        scope: ExecutionScope,
150    ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
151        let controller = SqliteRuntimeEffectController {
152            inner: Arc::clone(&self.inner),
153            scope: scope.clone(),
154        };
155        Ok(Some(ScopedEffectController::shared(
156            Arc::new(controller),
157            scope,
158        )?))
159    }
160}
161
162impl SqliteRuntimeEffectController {
163    pub async fn open(path: &Path, scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
164        Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
165    }
166
167    pub async fn open_with_options(
168        path: &Path,
169        scope: ExecutionScope,
170        options: SqliteEffectReplayOptions,
171    ) -> tokio_rusqlite::Result<Self> {
172        Ok(Self {
173            inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
174            scope,
175        })
176    }
177
178    pub async fn memory(scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
179        Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
180    }
181
182    pub async fn memory_with_options(
183        scope: ExecutionScope,
184        options: SqliteEffectReplayOptions,
185    ) -> tokio_rusqlite::Result<Self> {
186        Ok(Self {
187            inner: open_effect_replay_memory_inner(options).await?,
188            scope,
189        })
190    }
191
192    /// Force strict replay mode: missing effect history fails instead of
193    /// executing locally. Normal operation still replays any completed row.
194    pub fn start_replay(&self) {
195        self.inner.replay_mode.store(true, Ordering::SeqCst);
196    }
197
198    async fn prepare_effect(
199        &self,
200        envelope: &RuntimeEffectEnvelope,
201    ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
202        let replay_key = envelope
203            .invocation
204            .replay_key()
205            .ok_or_else(|| {
206                RuntimeEffectControllerError::new(
207                    "sqlite_effect_replay_key_missing",
208                    "runtime effect envelope requires replay.key",
209                )
210            })?
211            .to_string();
212        let envelope_hash = envelope.stable_hash()?;
213        let scope_id = self.scope.id().to_string();
214        let now = current_epoch_ms();
215        let lease_token = self.inner.next_lease_token();
216        let due_at_ms = sleep_due_at_ms(envelope, now);
217        let lease_expires_at_ms = now.saturating_add(self.inner.lease_ttl_ms);
218        let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
219        let owner_id = self.inner.owner_id.clone();
220        let lease_ttl_ms = self.inner.lease_ttl_ms;
221
222        // The `BEGIN IMMEDIATE` transaction is run on the connection thread via
223        // `conn.write`. The closure returns a `rusqlite::Result` carrying our
224        // own outcome `Result<PreparedEffect, RuntimeEffectControllerError>`:
225        // the SQL committing is independent of whether the recorded effect was
226        // a success, a failure, or a fresh claim — exactly the prior behaviour
227        // (commit on `Ok(_)` for any `PreparedEffect` variant). Only a real
228        // SQLite error rolls back.
229        let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
230            .inner
231            .conn
232            .write(move |tx| {
233                let row = tx
234                    .query_row(
235                        "SELECT envelope_hash, status, outcome_json, error_json,
236                                lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
237                         FROM runtime_effect_replay
238                         WHERE scope_id = ?1 AND replay_key = ?2",
239                        params![scope_id.as_str(), replay_key.as_str()],
240                        |row| {
241                            Ok((
242                                row.get::<_, String>(0)?,
243                                row.get::<_, String>(1)?,
244                                row.get::<_, Option<String>>(2)?,
245                                row.get::<_, Option<String>>(3)?,
246                                row.get::<_, i64>(6)?,
247                                row.get::<_, Option<i64>>(7)?,
248                            ))
249                        },
250                    )
251                    .optional()?;
252
253                let Some((
254                    existing_hash,
255                    status,
256                    outcome_json,
257                    error_json,
258                    lease_expires_row,
259                    existing_due_row,
260                )) = row
261                else {
262                    if replay_mode {
263                        return Ok(Err(RuntimeEffectControllerError::new(
264                            "sqlite_effect_replay_missing",
265                            format!(
266                                "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
267                            ),
268                        )));
269                    }
270                    let due_at_param = due_at_ms.map(|value| value as i64);
271                    tx.execute(
272                        "INSERT INTO runtime_effect_replay (
273                            scope_id, replay_key, envelope_hash, status, outcome_json,
274                            error_json, lease_owner_id, lease_token, lease_expires_at_ms,
275                            due_at_ms, created_at_ms, updated_at_ms
276                         )
277                         VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
278                        params![
279                            scope_id.as_str(),
280                            replay_key.as_str(),
281                            envelope_hash.as_str(),
282                            STATUS_IN_PROGRESS,
283                            owner_id.as_str(),
284                            lease_token.as_str(),
285                            lease_expires_at_ms as i64,
286                            due_at_param,
287                            now as i64,
288                            now as i64,
289                        ],
290                    )?;
291                    return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
292                        scope_id,
293                        replay_key,
294                        envelope_hash,
295                        lease_token,
296                        due_at_ms,
297                    })));
298                };
299
300                if existing_hash != envelope_hash {
301                    return Ok(Err(RuntimeEffectControllerError::new(
302                        "sqlite_effect_replay_hash_conflict",
303                        format!(
304                            "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
305                        ),
306                    )));
307                }
308
309                let lease_expires_at_ms = lease_expires_row as u64;
310                let existing_due_at_ms = existing_due_row.map(|value| value as u64);
311
312                match status.as_str() {
313                    STATUS_COMPLETED => {
314                        let Some(json) = outcome_json else {
315                            return Ok(Err(RuntimeEffectControllerError::new(
316                                "sqlite_effect_replay_corrupt_row",
317                                "completed runtime effect row is missing outcome_json",
318                            )));
319                        };
320                        let outcome = match serde_json::from_str(&json) {
321                            Ok(outcome) => outcome,
322                            Err(err) => return Ok(Err(effect_decode_error(err))),
323                        };
324                        Ok(Ok(PreparedEffect::ReplayOutcome {
325                            outcome: Box::new(outcome),
326                            due_at_ms: existing_due_at_ms,
327                        }))
328                    }
329                    STATUS_FAILED => {
330                        let Some(json) = error_json else {
331                            return Ok(Err(RuntimeEffectControllerError::new(
332                                "sqlite_effect_replay_corrupt_row",
333                                "failed runtime effect row is missing error_json",
334                            )));
335                        };
336                        let err = match serde_json::from_str(&json) {
337                            Ok(err) => err,
338                            Err(err) => return Ok(Err(effect_decode_error(err))),
339                        };
340                        Ok(Ok(PreparedEffect::ReplayError(err)))
341                    }
342                    STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
343                        retry_at_ms: lease_expires_at_ms,
344                    })),
345                    STATUS_IN_PROGRESS => {
346                        let due_at_ms = existing_due_at_ms.or(due_at_ms);
347                        let due_at_param = due_at_ms.map(|value| value as i64);
348                        tx.execute(
349                            "UPDATE runtime_effect_replay
350                             SET lease_owner_id = ?3,
351                                 lease_token = ?4,
352                                 lease_expires_at_ms = ?5,
353                                 due_at_ms = ?6,
354                                 updated_at_ms = ?7
355                             WHERE scope_id = ?1 AND replay_key = ?2",
356                            params![
357                                scope_id.as_str(),
358                                replay_key.as_str(),
359                                owner_id.as_str(),
360                                lease_token.as_str(),
361                                current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
362                                due_at_param,
363                                current_epoch_ms() as i64,
364                            ],
365                        )?;
366                        Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
367                            scope_id,
368                            replay_key,
369                            envelope_hash,
370                            lease_token,
371                            due_at_ms,
372                        })))
373                    }
374                    other => Ok(Err(RuntimeEffectControllerError::new(
375                        "sqlite_effect_replay_corrupt_row",
376                        format!("unknown runtime effect replay status `{other}`"),
377                    ))),
378                }
379            })
380            .await
381            .map_err(effect_sqlite_error)?;
382        outcome
383    }
384
385    async fn finalize_effect(
386        &self,
387        claim: &ClaimedEffect,
388        outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
389    ) -> Result<(), RuntimeEffectControllerError> {
390        let (status, outcome_json, error_json) = match outcome {
391            Ok(outcome) => (
392                STATUS_COMPLETED,
393                Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
394                None,
395            ),
396            Err(err) => (
397                STATUS_FAILED,
398                None,
399                Some(serde_json::to_string(err).map_err(effect_encode_error)?),
400            ),
401        };
402        let now = current_epoch_ms();
403        let scope_id = claim.scope_id.clone();
404        let replay_key = claim.replay_key.clone();
405        let envelope_hash = claim.envelope_hash.clone();
406        let owner_id = self.inner.owner_id.clone();
407        let lease_token = claim.lease_token.clone();
408        let status = status.to_string();
409
410        let result: Result<(), RuntimeEffectControllerError> = self
411            .inner
412            .conn
413            .write(move |tx| {
414                let changed = tx.execute(
415                    "UPDATE runtime_effect_replay
416                     SET status = ?6,
417                         outcome_json = ?7,
418                         error_json = ?8,
419                         lease_owner_id = NULL,
420                         lease_token = NULL,
421                         lease_expires_at_ms = 0,
422                         updated_at_ms = ?9
423                     WHERE scope_id = ?1
424                       AND replay_key = ?2
425                       AND envelope_hash = ?3
426                       AND lease_owner_id = ?4
427                       AND lease_token = ?5
428                       AND status = 'in_progress'
429                       AND lease_expires_at_ms > ?10",
430                    params![
431                        scope_id.as_str(),
432                        replay_key.as_str(),
433                        envelope_hash.as_str(),
434                        owner_id.as_str(),
435                        lease_token.as_str(),
436                        status.as_str(),
437                        outcome_json,
438                        error_json,
439                        now as i64,
440                        now as i64,
441                    ],
442                )?;
443                if changed != 1 {
444                    return Ok(Err(RuntimeEffectControllerError::new(
445                        "sqlite_effect_replay_lease_lost",
446                        format!(
447                            "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
448                        ),
449                    )));
450                }
451                Ok(Ok(()))
452            })
453            .await
454            .map_err(effect_sqlite_error)?;
455        result
456    }
457
458    async fn renew_effect_lease(
459        &self,
460        claim: &ClaimedEffect,
461    ) -> Result<(), RuntimeEffectControllerError> {
462        let now = current_epoch_ms();
463        let renewed_expires_at = now.saturating_add(self.inner.lease_ttl_ms);
464        let scope_id = claim.scope_id.clone();
465        let replay_key = claim.replay_key.clone();
466        let envelope_hash = claim.envelope_hash.clone();
467        let owner_id = self.inner.owner_id.clone();
468        let lease_token = claim.lease_token.clone();
469
470        let result: Result<(), RuntimeEffectControllerError> = self
471            .inner
472            .conn
473            .write(move |tx| {
474                let changed = tx.execute(
475                    "UPDATE runtime_effect_replay
476                     SET lease_expires_at_ms = ?6,
477                         updated_at_ms = ?7
478                     WHERE scope_id = ?1
479                       AND replay_key = ?2
480                       AND envelope_hash = ?3
481                       AND lease_owner_id = ?4
482                       AND lease_token = ?5
483                       AND status = 'in_progress'
484                       AND lease_expires_at_ms > ?8",
485                    params![
486                        scope_id.as_str(),
487                        replay_key.as_str(),
488                        envelope_hash.as_str(),
489                        owner_id.as_str(),
490                        lease_token.as_str(),
491                        renewed_expires_at as i64,
492                        now as i64,
493                        now as i64,
494                    ],
495                )?;
496                if changed != 1 {
497                    return Ok(Err(RuntimeEffectControllerError::new(
498                        "sqlite_effect_replay_lease_lost",
499                        format!(
500                            "runtime effect replay lease was lost while executing scope `{scope_id}` replay key `{replay_key}`"
501                        ),
502                    )));
503                }
504                Ok(Ok(()))
505            })
506            .await
507            .map_err(effect_sqlite_error)?;
508        result
509    }
510
511    async fn execute_claimed_effect_with_renewal(
512        &self,
513        claim: &ClaimedEffect,
514        envelope: RuntimeEffectEnvelope,
515        local_executor: RuntimeEffectLocalExecutor<'_>,
516    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
517        let renew_every = Duration::from_millis((self.inner.lease_ttl_ms / 3).max(1));
518        let effect = self.execute_claimed_effect(claim, envelope, local_executor);
519        tokio::pin!(effect);
520        let renew_sleep = tokio::time::sleep(renew_every);
521        tokio::pin!(renew_sleep);
522
523        loop {
524            tokio::select! {
525                result = &mut effect => return result,
526                _ = &mut renew_sleep => {
527                    self.renew_effect_lease(claim).await?;
528                    renew_sleep.as_mut().reset(tokio::time::Instant::now() + renew_every);
529                }
530            }
531        }
532    }
533
534    async fn execute_claimed_effect(
535        &self,
536        claim: &ClaimedEffect,
537        envelope: RuntimeEffectEnvelope,
538        local_executor: RuntimeEffectLocalExecutor<'_>,
539    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
540        if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
541            sleep_until_due(claim.due_at_ms).await;
542            return Ok(RuntimeEffectOutcome::Sleep);
543        }
544        match envelope.command {
545            RuntimeEffectCommand::Process { command } => {
546                let result = self
547                    .execute_process_command(*command, local_executor)
548                    .await?;
549                Ok(RuntimeEffectOutcome::Process { result })
550            }
551            _ => local_executor.execute(envelope).await,
552        }
553    }
554
555    async fn execute_process_command(
556        &self,
557        command: ProcessCommand,
558        local_executor: RuntimeEffectLocalExecutor<'_>,
559    ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
560        let execution = local_executor.into_process()?;
561        let registry = execution.registry;
562        match command {
563            ProcessCommand::Start {
564                registration,
565                grant,
566                execution_context: _,
567            } => {
568                let registration_id = registration.id.clone();
569                let record = registry.register_process(registration).await?;
570                if let Some(grant) = grant {
571                    registry
572                        .grant_handle(&grant.session_scope, &registration_id, grant.descriptor)
573                        .await?;
574                }
575                Ok(ProcessEffectOutcome::Start { record })
576            }
577            ProcessCommand::List {
578                session_scope,
579                mode,
580            } => {
581                let entries = match mode {
582                    lash_core::ProcessListMode::Live => {
583                        registry.list_live_handle_grants(&session_scope).await?
584                    }
585                    lash_core::ProcessListMode::All => {
586                        registry.list_handle_grants(&session_scope).await?
587                    }
588                };
589                Ok(ProcessEffectOutcome::List { entries })
590            }
591            ProcessCommand::Transfer {
592                from_scope,
593                to_scope,
594                process_ids,
595            } => {
596                registry
597                    .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
598                    .await?;
599                Ok(ProcessEffectOutcome::Transfer)
600            }
601            ProcessCommand::DeleteSession { session_id } => {
602                let report = registry.delete_session_process_state(&session_id).await?;
603                Ok(ProcessEffectOutcome::DeleteSession { report })
604            }
605            ProcessCommand::Await { process_id } => {
606                let output = registry.await_process(&process_id).await?;
607                Ok(ProcessEffectOutcome::Await { output })
608            }
609            ProcessCommand::Cancel { process_id, reason } => {
610                registry
611                    .append_event(
612                        &process_id,
613                        lash_core::ProcessEventAppendRequest::cancel_requested(&process_id, reason),
614                    )
615                    .await?;
616                let record = registry.get_process(&process_id).await.ok_or_else(|| {
617                    PluginError::Session(format!("unknown process `{process_id}`"))
618                })?;
619                Ok(ProcessEffectOutcome::Cancel { record })
620            }
621            ProcessCommand::Signal {
622                process_id,
623                request,
624                ..
625            } => {
626                let result = registry.append_event(&process_id, request).await?;
627                Ok(ProcessEffectOutcome::Signal {
628                    event: result.event,
629                })
630            }
631        }
632    }
633}
634
635#[async_trait::async_trait]
636impl RuntimeEffectController for SqliteRuntimeEffectController {
637    fn durability_tier(&self) -> DurabilityTier {
638        DurabilityTier::Durable
639    }
640
641    fn requires_durable_attachment_store(&self) -> bool {
642        true
643    }
644
645    fn supports_durable_effects(&self) -> bool {
646        true
647    }
648
649    async fn execute_effect(
650        &self,
651        envelope: RuntimeEffectEnvelope,
652        local_executor: RuntimeEffectLocalExecutor<'_>,
653    ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
654        loop {
655            match self.prepare_effect(&envelope).await? {
656                PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
657                    sleep_until_due(due_at_ms).await;
658                    return Ok(*outcome);
659                }
660                PreparedEffect::ReplayError(err) => return Err(err),
661                PreparedEffect::Claimed(claim) => {
662                    let result = self
663                        .execute_claimed_effect_with_renewal(&claim, envelope, local_executor)
664                        .await;
665                    let finalize = self.finalize_effect(&claim, &result).await;
666                    return match (result, finalize) {
667                        (Ok(outcome), Ok(())) => Ok(outcome),
668                        (Err(err), Ok(())) => Err(err),
669                        (_, Err(err)) => Err(err),
670                    };
671                }
672                PreparedEffect::Busy { retry_at_ms } => {
673                    sleep_until_retry(retry_at_ms).await;
674                }
675            }
676        }
677    }
678}
679
680async fn open_effect_replay_inner(
681    path: &Path,
682    backing: StoreBacking,
683    options: SqliteEffectReplayOptions,
684) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
685    let conn = SqliteConnection::open(path).await?;
686    ensure_effect_schema(&conn).await?;
687    apply_pragmas(&conn, backing).await?;
688    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
689}
690
691async fn open_effect_replay_memory_inner(
692    options: SqliteEffectReplayOptions,
693) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
694    let conn = SqliteConnection::open_in_memory().await?;
695    ensure_effect_schema(&conn).await?;
696    apply_pragmas(&conn, StoreBacking::Memory).await?;
697    Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
698}
699
700impl SqliteEffectReplayInner {
701    fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
702        let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
703        Self {
704            conn,
705            owner_id: format!(
706                "pid{}-{sequence}-{}",
707                std::process::id(),
708                current_epoch_ms()
709            ),
710            lease_counter: AtomicU64::new(1),
711            replay_mode: AtomicBool::new(false),
712            lease_ttl_ms: duration_ms(options.lease_ttl),
713        }
714    }
715
716    fn next_lease_token(&self) -> String {
717        let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
718        format!("{}:{sequence}", self.owner_id)
719    }
720}
721
722fn duration_ms(duration: Duration) -> u64 {
723    let millis = duration.as_millis();
724    if millis == 0 {
725        1
726    } else {
727        millis.min(u128::from(u64::MAX)) as u64
728    }
729}
730
731fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
732    match envelope.command {
733        RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
734        _ => None,
735    }
736}
737
738async fn sleep_until_due(due_at_ms: Option<u64>) {
739    let Some(due_at_ms) = due_at_ms else {
740        return;
741    };
742    let now = current_epoch_ms();
743    if due_at_ms > now {
744        tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
745    }
746}
747
748async fn sleep_until_retry(retry_at_ms: u64) {
749    let now = current_epoch_ms();
750    let delay = if retry_at_ms > now {
751        Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
752    } else {
753        BUSY_POLL
754    };
755    tokio::time::sleep(delay).await;
756}
757
758fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
759    RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
760}
761
762fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
763    RuntimeEffectControllerError::new(
764        "sqlite_effect_replay_encode",
765        format!("failed to encode runtime effect replay row: {err}"),
766    )
767}
768
769fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
770    RuntimeEffectControllerError::new(
771        "sqlite_effect_replay_decode",
772        format!("failed to decode runtime effect replay row: {err}"),
773    )
774}