1use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19 DurabilityTier, EffectHost, ExecutionScope, PluginError, ProcessCommand, ProcessEffectOutcome,
20 RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21 RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22 ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(30);
31const BUSY_POLL: Duration = Duration::from_millis(25);
32
33static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
34
35#[derive(Clone, Debug)]
37pub struct SqliteEffectReplayOptions {
38 pub lease_ttl: Duration,
39}
40
41impl Default for SqliteEffectReplayOptions {
42 fn default() -> Self {
43 Self {
44 lease_ttl: DEFAULT_LEASE_TTL,
45 }
46 }
47}
48
49struct SqliteEffectReplayInner {
50 conn: SqliteConnection,
51 owner_id: String,
52 lease_counter: AtomicU64,
53 replay_mode: AtomicBool,
54 lease_ttl_ms: u64,
55}
56
57#[derive(Clone)]
63pub struct SqliteEffectHost {
64 inner: Arc<SqliteEffectReplayInner>,
65}
66
67#[derive(Clone)]
69pub struct SqliteRuntimeEffectController {
70 inner: Arc<SqliteEffectReplayInner>,
71 scope: ExecutionScope,
72}
73
74struct ClaimedEffect {
75 scope_id: String,
76 replay_key: String,
77 envelope_hash: String,
78 lease_token: String,
79 due_at_ms: Option<u64>,
80}
81
82enum PreparedEffect {
83 ReplayOutcome {
84 outcome: Box<RuntimeEffectOutcome>,
85 due_at_ms: Option<u64>,
86 },
87 ReplayError(RuntimeEffectControllerError),
88 Claimed(ClaimedEffect),
89 Busy {
90 retry_at_ms: u64,
91 },
92}
93
94impl SqliteEffectHost {
95 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
96 Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
97 }
98
99 pub async fn open_with_options(
100 path: &Path,
101 options: SqliteEffectReplayOptions,
102 ) -> tokio_rusqlite::Result<Self> {
103 Ok(Self {
104 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
105 })
106 }
107
108 pub async fn memory() -> tokio_rusqlite::Result<Self> {
109 Self::memory_with_options(SqliteEffectReplayOptions::default()).await
110 }
111
112 pub async fn memory_with_options(
113 options: SqliteEffectReplayOptions,
114 ) -> tokio_rusqlite::Result<Self> {
115 Ok(Self {
116 inner: open_effect_replay_memory_inner(options).await?,
117 })
118 }
119
120 pub fn start_replay(&self) {
123 self.inner.replay_mode.store(true, Ordering::SeqCst);
124 }
125}
126
127impl EffectHost for SqliteEffectHost {
128 fn durability_tier(&self) -> DurabilityTier {
129 DurabilityTier::Durable
130 }
131
132 fn requires_durable_attachment_store(&self) -> bool {
133 true
134 }
135
136 fn scoped<'run>(
137 &'run self,
138 scope: ExecutionScope,
139 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
140 let controller = SqliteRuntimeEffectController {
141 inner: Arc::clone(&self.inner),
142 scope: scope.clone(),
143 };
144 ScopedEffectController::shared(Arc::new(controller), scope)
145 }
146
147 fn scoped_static(
148 &self,
149 scope: ExecutionScope,
150 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
151 let controller = SqliteRuntimeEffectController {
152 inner: Arc::clone(&self.inner),
153 scope: scope.clone(),
154 };
155 Ok(Some(ScopedEffectController::shared(
156 Arc::new(controller),
157 scope,
158 )?))
159 }
160}
161
162impl SqliteRuntimeEffectController {
163 pub async fn open(path: &Path, scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
164 Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
165 }
166
167 pub async fn open_with_options(
168 path: &Path,
169 scope: ExecutionScope,
170 options: SqliteEffectReplayOptions,
171 ) -> tokio_rusqlite::Result<Self> {
172 Ok(Self {
173 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
174 scope,
175 })
176 }
177
178 pub async fn memory(scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
179 Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
180 }
181
182 pub async fn memory_with_options(
183 scope: ExecutionScope,
184 options: SqliteEffectReplayOptions,
185 ) -> tokio_rusqlite::Result<Self> {
186 Ok(Self {
187 inner: open_effect_replay_memory_inner(options).await?,
188 scope,
189 })
190 }
191
192 pub fn start_replay(&self) {
195 self.inner.replay_mode.store(true, Ordering::SeqCst);
196 }
197
198 async fn prepare_effect(
199 &self,
200 envelope: &RuntimeEffectEnvelope,
201 ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
202 let replay_key = envelope
203 .invocation
204 .replay_key()
205 .ok_or_else(|| {
206 RuntimeEffectControllerError::new(
207 "sqlite_effect_replay_key_missing",
208 "runtime effect envelope requires replay.key",
209 )
210 })?
211 .to_string();
212 let envelope_hash = envelope.stable_hash()?;
213 let scope_id = self.scope.id().to_string();
214 let now = current_epoch_ms();
215 let lease_token = self.inner.next_lease_token();
216 let due_at_ms = sleep_due_at_ms(envelope, now);
217 let lease_expires_at_ms = now.saturating_add(self.inner.lease_ttl_ms);
218 let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
219 let owner_id = self.inner.owner_id.clone();
220 let lease_ttl_ms = self.inner.lease_ttl_ms;
221
222 let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
230 .inner
231 .conn
232 .write(move |tx| {
233 let row = tx
234 .query_row(
235 "SELECT envelope_hash, status, outcome_json, error_json,
236 lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
237 FROM runtime_effect_replay
238 WHERE scope_id = ?1 AND replay_key = ?2",
239 params![scope_id.as_str(), replay_key.as_str()],
240 |row| {
241 Ok((
242 row.get::<_, String>(0)?,
243 row.get::<_, String>(1)?,
244 row.get::<_, Option<String>>(2)?,
245 row.get::<_, Option<String>>(3)?,
246 row.get::<_, i64>(6)?,
247 row.get::<_, Option<i64>>(7)?,
248 ))
249 },
250 )
251 .optional()?;
252
253 let Some((
254 existing_hash,
255 status,
256 outcome_json,
257 error_json,
258 lease_expires_row,
259 existing_due_row,
260 )) = row
261 else {
262 if replay_mode {
263 return Ok(Err(RuntimeEffectControllerError::new(
264 "sqlite_effect_replay_missing",
265 format!(
266 "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
267 ),
268 )));
269 }
270 let due_at_param = due_at_ms.map(|value| value as i64);
271 tx.execute(
272 "INSERT INTO runtime_effect_replay (
273 scope_id, replay_key, envelope_hash, status, outcome_json,
274 error_json, lease_owner_id, lease_token, lease_expires_at_ms,
275 due_at_ms, created_at_ms, updated_at_ms
276 )
277 VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
278 params![
279 scope_id.as_str(),
280 replay_key.as_str(),
281 envelope_hash.as_str(),
282 STATUS_IN_PROGRESS,
283 owner_id.as_str(),
284 lease_token.as_str(),
285 lease_expires_at_ms as i64,
286 due_at_param,
287 now as i64,
288 now as i64,
289 ],
290 )?;
291 return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
292 scope_id,
293 replay_key,
294 envelope_hash,
295 lease_token,
296 due_at_ms,
297 })));
298 };
299
300 if existing_hash != envelope_hash {
301 return Ok(Err(RuntimeEffectControllerError::new(
302 "sqlite_effect_replay_hash_conflict",
303 format!(
304 "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
305 ),
306 )));
307 }
308
309 let lease_expires_at_ms = lease_expires_row as u64;
310 let existing_due_at_ms = existing_due_row.map(|value| value as u64);
311
312 match status.as_str() {
313 STATUS_COMPLETED => {
314 let Some(json) = outcome_json else {
315 return Ok(Err(RuntimeEffectControllerError::new(
316 "sqlite_effect_replay_corrupt_row",
317 "completed runtime effect row is missing outcome_json",
318 )));
319 };
320 let outcome = match serde_json::from_str(&json) {
321 Ok(outcome) => outcome,
322 Err(err) => return Ok(Err(effect_decode_error(err))),
323 };
324 Ok(Ok(PreparedEffect::ReplayOutcome {
325 outcome: Box::new(outcome),
326 due_at_ms: existing_due_at_ms,
327 }))
328 }
329 STATUS_FAILED => {
330 let Some(json) = error_json else {
331 return Ok(Err(RuntimeEffectControllerError::new(
332 "sqlite_effect_replay_corrupt_row",
333 "failed runtime effect row is missing error_json",
334 )));
335 };
336 let err = match serde_json::from_str(&json) {
337 Ok(err) => err,
338 Err(err) => return Ok(Err(effect_decode_error(err))),
339 };
340 Ok(Ok(PreparedEffect::ReplayError(err)))
341 }
342 STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
343 retry_at_ms: lease_expires_at_ms,
344 })),
345 STATUS_IN_PROGRESS => {
346 let due_at_ms = existing_due_at_ms.or(due_at_ms);
347 let due_at_param = due_at_ms.map(|value| value as i64);
348 tx.execute(
349 "UPDATE runtime_effect_replay
350 SET lease_owner_id = ?3,
351 lease_token = ?4,
352 lease_expires_at_ms = ?5,
353 due_at_ms = ?6,
354 updated_at_ms = ?7
355 WHERE scope_id = ?1 AND replay_key = ?2",
356 params![
357 scope_id.as_str(),
358 replay_key.as_str(),
359 owner_id.as_str(),
360 lease_token.as_str(),
361 current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
362 due_at_param,
363 current_epoch_ms() as i64,
364 ],
365 )?;
366 Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
367 scope_id,
368 replay_key,
369 envelope_hash,
370 lease_token,
371 due_at_ms,
372 })))
373 }
374 other => Ok(Err(RuntimeEffectControllerError::new(
375 "sqlite_effect_replay_corrupt_row",
376 format!("unknown runtime effect replay status `{other}`"),
377 ))),
378 }
379 })
380 .await
381 .map_err(effect_sqlite_error)?;
382 outcome
383 }
384
385 async fn finalize_effect(
386 &self,
387 claim: &ClaimedEffect,
388 outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
389 ) -> Result<(), RuntimeEffectControllerError> {
390 let (status, outcome_json, error_json) = match outcome {
391 Ok(outcome) => (
392 STATUS_COMPLETED,
393 Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
394 None,
395 ),
396 Err(err) => (
397 STATUS_FAILED,
398 None,
399 Some(serde_json::to_string(err).map_err(effect_encode_error)?),
400 ),
401 };
402 let now = current_epoch_ms();
403 let scope_id = claim.scope_id.clone();
404 let replay_key = claim.replay_key.clone();
405 let envelope_hash = claim.envelope_hash.clone();
406 let owner_id = self.inner.owner_id.clone();
407 let lease_token = claim.lease_token.clone();
408 let status = status.to_string();
409
410 let result: Result<(), RuntimeEffectControllerError> = self
411 .inner
412 .conn
413 .write(move |tx| {
414 let changed = tx.execute(
415 "UPDATE runtime_effect_replay
416 SET status = ?6,
417 outcome_json = ?7,
418 error_json = ?8,
419 lease_owner_id = NULL,
420 lease_token = NULL,
421 lease_expires_at_ms = 0,
422 updated_at_ms = ?9
423 WHERE scope_id = ?1
424 AND replay_key = ?2
425 AND envelope_hash = ?3
426 AND lease_owner_id = ?4
427 AND lease_token = ?5
428 AND status = 'in_progress'
429 AND lease_expires_at_ms > ?10",
430 params![
431 scope_id.as_str(),
432 replay_key.as_str(),
433 envelope_hash.as_str(),
434 owner_id.as_str(),
435 lease_token.as_str(),
436 status.as_str(),
437 outcome_json,
438 error_json,
439 now as i64,
440 now as i64,
441 ],
442 )?;
443 if changed != 1 {
444 return Ok(Err(RuntimeEffectControllerError::new(
445 "sqlite_effect_replay_lease_lost",
446 format!(
447 "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
448 ),
449 )));
450 }
451 Ok(Ok(()))
452 })
453 .await
454 .map_err(effect_sqlite_error)?;
455 result
456 }
457
458 async fn renew_effect_lease(
459 &self,
460 claim: &ClaimedEffect,
461 ) -> Result<(), RuntimeEffectControllerError> {
462 let now = current_epoch_ms();
463 let renewed_expires_at = now.saturating_add(self.inner.lease_ttl_ms);
464 let scope_id = claim.scope_id.clone();
465 let replay_key = claim.replay_key.clone();
466 let envelope_hash = claim.envelope_hash.clone();
467 let owner_id = self.inner.owner_id.clone();
468 let lease_token = claim.lease_token.clone();
469
470 let result: Result<(), RuntimeEffectControllerError> = self
471 .inner
472 .conn
473 .write(move |tx| {
474 let changed = tx.execute(
475 "UPDATE runtime_effect_replay
476 SET lease_expires_at_ms = ?6,
477 updated_at_ms = ?7
478 WHERE scope_id = ?1
479 AND replay_key = ?2
480 AND envelope_hash = ?3
481 AND lease_owner_id = ?4
482 AND lease_token = ?5
483 AND status = 'in_progress'
484 AND lease_expires_at_ms > ?8",
485 params![
486 scope_id.as_str(),
487 replay_key.as_str(),
488 envelope_hash.as_str(),
489 owner_id.as_str(),
490 lease_token.as_str(),
491 renewed_expires_at as i64,
492 now as i64,
493 now as i64,
494 ],
495 )?;
496 if changed != 1 {
497 return Ok(Err(RuntimeEffectControllerError::new(
498 "sqlite_effect_replay_lease_lost",
499 format!(
500 "runtime effect replay lease was lost while executing scope `{scope_id}` replay key `{replay_key}`"
501 ),
502 )));
503 }
504 Ok(Ok(()))
505 })
506 .await
507 .map_err(effect_sqlite_error)?;
508 result
509 }
510
511 async fn execute_claimed_effect_with_renewal(
512 &self,
513 claim: &ClaimedEffect,
514 envelope: RuntimeEffectEnvelope,
515 local_executor: RuntimeEffectLocalExecutor<'_>,
516 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
517 let renew_every = Duration::from_millis((self.inner.lease_ttl_ms / 3).max(1));
518 let effect = self.execute_claimed_effect(claim, envelope, local_executor);
519 tokio::pin!(effect);
520 let renew_sleep = tokio::time::sleep(renew_every);
521 tokio::pin!(renew_sleep);
522
523 loop {
524 tokio::select! {
525 result = &mut effect => return result,
526 _ = &mut renew_sleep => {
527 self.renew_effect_lease(claim).await?;
528 renew_sleep.as_mut().reset(tokio::time::Instant::now() + renew_every);
529 }
530 }
531 }
532 }
533
534 async fn execute_claimed_effect(
535 &self,
536 claim: &ClaimedEffect,
537 envelope: RuntimeEffectEnvelope,
538 local_executor: RuntimeEffectLocalExecutor<'_>,
539 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
540 if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
541 sleep_until_due(claim.due_at_ms).await;
542 return Ok(RuntimeEffectOutcome::Sleep);
543 }
544 match envelope.command {
545 RuntimeEffectCommand::Process { command } => {
546 let result = self
547 .execute_process_command(*command, local_executor)
548 .await?;
549 Ok(RuntimeEffectOutcome::Process { result })
550 }
551 _ => local_executor.execute(envelope).await,
552 }
553 }
554
555 async fn execute_process_command(
556 &self,
557 command: ProcessCommand,
558 local_executor: RuntimeEffectLocalExecutor<'_>,
559 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
560 let execution = local_executor.into_process()?;
561 let registry = execution.registry;
562 match command {
563 ProcessCommand::Start {
564 registration,
565 grant,
566 execution_context: _,
567 } => {
568 let registration_id = registration.id.clone();
569 let record = registry.register_process(registration).await?;
570 if let Some(grant) = grant {
571 registry
572 .grant_handle(&grant.session_scope, ®istration_id, grant.descriptor)
573 .await?;
574 }
575 Ok(ProcessEffectOutcome::Start { record })
576 }
577 ProcessCommand::List {
578 session_scope,
579 mode,
580 } => {
581 let entries = match mode {
582 lash_core::ProcessListMode::Live => {
583 registry.list_live_handle_grants(&session_scope).await?
584 }
585 lash_core::ProcessListMode::All => {
586 registry.list_handle_grants(&session_scope).await?
587 }
588 };
589 Ok(ProcessEffectOutcome::List { entries })
590 }
591 ProcessCommand::Transfer {
592 from_scope,
593 to_scope,
594 process_ids,
595 } => {
596 registry
597 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
598 .await?;
599 Ok(ProcessEffectOutcome::Transfer)
600 }
601 ProcessCommand::DeleteSession { session_id } => {
602 let report = registry.delete_session_process_state(&session_id).await?;
603 Ok(ProcessEffectOutcome::DeleteSession { report })
604 }
605 ProcessCommand::Await { process_id } => {
606 let output = registry.await_process(&process_id).await?;
607 Ok(ProcessEffectOutcome::Await { output })
608 }
609 ProcessCommand::Cancel { process_id, reason } => {
610 registry
611 .append_event(
612 &process_id,
613 lash_core::ProcessEventAppendRequest::cancel_requested(&process_id, reason),
614 )
615 .await?;
616 let record = registry.get_process(&process_id).await.ok_or_else(|| {
617 PluginError::Session(format!("unknown process `{process_id}`"))
618 })?;
619 Ok(ProcessEffectOutcome::Cancel { record })
620 }
621 ProcessCommand::Signal {
622 process_id,
623 request,
624 ..
625 } => {
626 let result = registry.append_event(&process_id, request).await?;
627 Ok(ProcessEffectOutcome::Signal {
628 event: result.event,
629 })
630 }
631 }
632 }
633}
634
635#[async_trait::async_trait]
636impl RuntimeEffectController for SqliteRuntimeEffectController {
637 fn durability_tier(&self) -> DurabilityTier {
638 DurabilityTier::Durable
639 }
640
641 fn requires_durable_attachment_store(&self) -> bool {
642 true
643 }
644
645 fn supports_durable_effects(&self) -> bool {
646 true
647 }
648
649 async fn execute_effect(
650 &self,
651 envelope: RuntimeEffectEnvelope,
652 local_executor: RuntimeEffectLocalExecutor<'_>,
653 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
654 loop {
655 match self.prepare_effect(&envelope).await? {
656 PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
657 sleep_until_due(due_at_ms).await;
658 return Ok(*outcome);
659 }
660 PreparedEffect::ReplayError(err) => return Err(err),
661 PreparedEffect::Claimed(claim) => {
662 let result = self
663 .execute_claimed_effect_with_renewal(&claim, envelope, local_executor)
664 .await;
665 let finalize = self.finalize_effect(&claim, &result).await;
666 return match (result, finalize) {
667 (Ok(outcome), Ok(())) => Ok(outcome),
668 (Err(err), Ok(())) => Err(err),
669 (_, Err(err)) => Err(err),
670 };
671 }
672 PreparedEffect::Busy { retry_at_ms } => {
673 sleep_until_retry(retry_at_ms).await;
674 }
675 }
676 }
677 }
678}
679
680async fn open_effect_replay_inner(
681 path: &Path,
682 backing: StoreBacking,
683 options: SqliteEffectReplayOptions,
684) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
685 let conn = SqliteConnection::open(path).await?;
686 ensure_effect_schema(&conn).await?;
687 apply_pragmas(&conn, backing).await?;
688 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
689}
690
691async fn open_effect_replay_memory_inner(
692 options: SqliteEffectReplayOptions,
693) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
694 let conn = SqliteConnection::open_in_memory().await?;
695 ensure_effect_schema(&conn).await?;
696 apply_pragmas(&conn, StoreBacking::Memory).await?;
697 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
698}
699
700impl SqliteEffectReplayInner {
701 fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
702 let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
703 Self {
704 conn,
705 owner_id: format!(
706 "pid{}-{sequence}-{}",
707 std::process::id(),
708 current_epoch_ms()
709 ),
710 lease_counter: AtomicU64::new(1),
711 replay_mode: AtomicBool::new(false),
712 lease_ttl_ms: duration_ms(options.lease_ttl),
713 }
714 }
715
716 fn next_lease_token(&self) -> String {
717 let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
718 format!("{}:{sequence}", self.owner_id)
719 }
720}
721
722fn duration_ms(duration: Duration) -> u64 {
723 let millis = duration.as_millis();
724 if millis == 0 {
725 1
726 } else {
727 millis.min(u128::from(u64::MAX)) as u64
728 }
729}
730
731fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
732 match envelope.command {
733 RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
734 _ => None,
735 }
736}
737
738async fn sleep_until_due(due_at_ms: Option<u64>) {
739 let Some(due_at_ms) = due_at_ms else {
740 return;
741 };
742 let now = current_epoch_ms();
743 if due_at_ms > now {
744 tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
745 }
746}
747
748async fn sleep_until_retry(retry_at_ms: u64) {
749 let now = current_epoch_ms();
750 let delay = if retry_at_ms > now {
751 Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
752 } else {
753 BUSY_POLL
754 };
755 tokio::time::sleep(delay).await;
756}
757
758fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
759 RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
760}
761
762fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
763 RuntimeEffectControllerError::new(
764 "sqlite_effect_replay_encode",
765 format!("failed to encode runtime effect replay row: {err}"),
766 )
767}
768
769fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
770 RuntimeEffectControllerError::new(
771 "sqlite_effect_replay_decode",
772 format!("failed to decode runtime effect replay row: {err}"),
773 )
774}