1use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19 AwaitEventResolver, DurabilityTier, EffectHost, ExecutionScope, LeaseTimings,
20 RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21 RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22 ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const BUSY_POLL: Duration = Duration::from_millis(25);
31
32static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
33
34#[derive(Clone, Debug, Default)]
36pub struct SqliteEffectReplayOptions {
37 pub lease_timings: LeaseTimings,
41}
42
43struct SqliteEffectReplayInner {
44 conn: SqliteConnection,
45 owner_id: String,
46 lease_counter: AtomicU64,
47 replay_mode: AtomicBool,
48 lease_timings: LeaseTimings,
49}
50
51#[derive(Clone)]
57pub struct SqliteEffectHost {
58 inner: Arc<SqliteEffectReplayInner>,
59}
60
61#[derive(Clone)]
63pub struct SqliteRuntimeEffectController {
64 inner: Arc<SqliteEffectReplayInner>,
65 scope: ExecutionScope,
66}
67
68struct ClaimedEffect {
69 scope_id: String,
70 replay_key: String,
71 envelope_hash: String,
72 lease_token: String,
73 due_at_ms: Option<u64>,
74}
75
76enum PreparedEffect {
77 ReplayOutcome {
78 outcome: Box<RuntimeEffectOutcome>,
79 due_at_ms: Option<u64>,
80 },
81 ReplayError(RuntimeEffectControllerError),
82 Claimed(ClaimedEffect),
83 Busy {
84 retry_at_ms: u64,
85 },
86}
87
88impl SqliteEffectHost {
89 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
90 Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
91 }
92
93 pub async fn open_with_options(
94 path: &Path,
95 options: SqliteEffectReplayOptions,
96 ) -> tokio_rusqlite::Result<Self> {
97 Ok(Self {
98 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
99 })
100 }
101
102 pub async fn memory() -> tokio_rusqlite::Result<Self> {
103 Self::memory_with_options(SqliteEffectReplayOptions::default()).await
104 }
105
106 pub async fn memory_with_options(
107 options: SqliteEffectReplayOptions,
108 ) -> tokio_rusqlite::Result<Self> {
109 Ok(Self {
110 inner: open_effect_replay_memory_inner(options).await?,
111 })
112 }
113
114 pub fn start_replay(&self) {
117 self.inner.replay_mode.store(true, Ordering::SeqCst);
118 }
119}
120
121impl AwaitEventResolver for SqliteEffectHost {
122 fn durability_tier(&self) -> DurabilityTier {
123 DurabilityTier::Durable
124 }
125
126 fn requires_durable_attachment_store(&self) -> bool {
127 true
128 }
129}
130
131impl EffectHost for SqliteEffectHost {
132 fn scoped<'run>(
133 &'run self,
134 scope: ExecutionScope,
135 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
136 let controller = SqliteRuntimeEffectController {
137 inner: Arc::clone(&self.inner),
138 scope: scope.clone(),
139 };
140 ScopedEffectController::shared(Arc::new(controller), scope)
141 }
142
143 fn scoped_static(
144 &self,
145 scope: ExecutionScope,
146 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
147 let controller = SqliteRuntimeEffectController {
148 inner: Arc::clone(&self.inner),
149 scope: scope.clone(),
150 };
151 Ok(Some(ScopedEffectController::shared(
152 Arc::new(controller),
153 scope,
154 )?))
155 }
156}
157
158impl SqliteRuntimeEffectController {
159 pub async fn open(path: &Path, scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
160 Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
161 }
162
163 pub async fn open_with_options(
164 path: &Path,
165 scope: ExecutionScope,
166 options: SqliteEffectReplayOptions,
167 ) -> tokio_rusqlite::Result<Self> {
168 Ok(Self {
169 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
170 scope,
171 })
172 }
173
174 pub async fn memory(scope: ExecutionScope) -> tokio_rusqlite::Result<Self> {
175 Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
176 }
177
178 pub async fn memory_with_options(
179 scope: ExecutionScope,
180 options: SqliteEffectReplayOptions,
181 ) -> tokio_rusqlite::Result<Self> {
182 Ok(Self {
183 inner: open_effect_replay_memory_inner(options).await?,
184 scope,
185 })
186 }
187
188 pub fn start_replay(&self) {
191 self.inner.replay_mode.store(true, Ordering::SeqCst);
192 }
193
194 async fn prepare_effect(
195 &self,
196 envelope: &RuntimeEffectEnvelope,
197 ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
198 let replay_key = envelope
199 .invocation
200 .replay_key()
201 .ok_or_else(|| {
202 RuntimeEffectControllerError::new(
203 "sqlite_effect_replay_key_missing",
204 "runtime effect envelope requires replay.key",
205 )
206 })?
207 .to_string();
208 let envelope_hash = envelope.stable_hash()?;
209 let scope_id = self.scope.id().to_string();
210 let now = current_epoch_ms();
211 let lease_token = self.inner.next_lease_token();
212 let due_at_ms = sleep_due_at_ms(envelope, now);
213 let lease_ttl_ms = self.inner.lease_timings.ttl_ms();
214 let lease_expires_at_ms = now.saturating_add(lease_ttl_ms);
215 let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
216 let owner_id = self.inner.owner_id.clone();
217
218 let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
226 .inner
227 .conn
228 .write(move |tx| {
229 let row = tx
230 .query_row(
231 "SELECT envelope_hash, status, outcome_json, error_json,
232 lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
233 FROM runtime_effect_replay
234 WHERE scope_id = ?1 AND replay_key = ?2",
235 params![scope_id.as_str(), replay_key.as_str()],
236 |row| {
237 Ok((
238 row.get::<_, String>(0)?,
239 row.get::<_, String>(1)?,
240 row.get::<_, Option<String>>(2)?,
241 row.get::<_, Option<String>>(3)?,
242 row.get::<_, i64>(6)?,
243 row.get::<_, Option<i64>>(7)?,
244 ))
245 },
246 )
247 .optional()?;
248
249 let Some((
250 existing_hash,
251 status,
252 outcome_json,
253 error_json,
254 lease_expires_row,
255 existing_due_row,
256 )) = row
257 else {
258 if replay_mode {
259 return Ok(Err(RuntimeEffectControllerError::new(
260 "sqlite_effect_replay_missing",
261 format!(
262 "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
263 ),
264 )));
265 }
266 let due_at_param = due_at_ms.map(|value| value as i64);
267 tx.execute(
268 "INSERT INTO runtime_effect_replay (
269 scope_id, replay_key, envelope_hash, status, outcome_json,
270 error_json, lease_owner_id, lease_token, lease_expires_at_ms,
271 due_at_ms, created_at_ms, updated_at_ms
272 )
273 VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
274 params![
275 scope_id.as_str(),
276 replay_key.as_str(),
277 envelope_hash.as_str(),
278 STATUS_IN_PROGRESS,
279 owner_id.as_str(),
280 lease_token.as_str(),
281 lease_expires_at_ms as i64,
282 due_at_param,
283 now as i64,
284 now as i64,
285 ],
286 )?;
287 return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
288 scope_id,
289 replay_key,
290 envelope_hash,
291 lease_token,
292 due_at_ms,
293 })));
294 };
295
296 if existing_hash != envelope_hash {
297 return Ok(Err(RuntimeEffectControllerError::new(
298 "sqlite_effect_replay_hash_conflict",
299 format!(
300 "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
301 ),
302 )));
303 }
304
305 let lease_expires_at_ms = lease_expires_row as u64;
306 let existing_due_at_ms = existing_due_row.map(|value| value as u64);
307
308 match status.as_str() {
309 STATUS_COMPLETED => {
310 let Some(json) = outcome_json else {
311 return Ok(Err(RuntimeEffectControllerError::new(
312 "sqlite_effect_replay_corrupt_row",
313 "completed runtime effect row is missing outcome_json",
314 )));
315 };
316 let outcome = match serde_json::from_str(&json) {
317 Ok(outcome) => outcome,
318 Err(err) => return Ok(Err(effect_decode_error(err))),
319 };
320 Ok(Ok(PreparedEffect::ReplayOutcome {
321 outcome: Box::new(outcome),
322 due_at_ms: existing_due_at_ms,
323 }))
324 }
325 STATUS_FAILED => {
326 let Some(json) = error_json else {
327 return Ok(Err(RuntimeEffectControllerError::new(
328 "sqlite_effect_replay_corrupt_row",
329 "failed runtime effect row is missing error_json",
330 )));
331 };
332 let err = match serde_json::from_str(&json) {
333 Ok(err) => err,
334 Err(err) => return Ok(Err(effect_decode_error(err))),
335 };
336 Ok(Ok(PreparedEffect::ReplayError(err)))
337 }
338 STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
339 retry_at_ms: lease_expires_at_ms,
340 })),
341 STATUS_IN_PROGRESS => {
342 let due_at_ms = existing_due_at_ms.or(due_at_ms);
343 let due_at_param = due_at_ms.map(|value| value as i64);
344 tx.execute(
345 "UPDATE runtime_effect_replay
346 SET lease_owner_id = ?3,
347 lease_token = ?4,
348 lease_expires_at_ms = ?5,
349 due_at_ms = ?6,
350 updated_at_ms = ?7
351 WHERE scope_id = ?1 AND replay_key = ?2",
352 params![
353 scope_id.as_str(),
354 replay_key.as_str(),
355 owner_id.as_str(),
356 lease_token.as_str(),
357 current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
358 due_at_param,
359 current_epoch_ms() as i64,
360 ],
361 )?;
362 Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
363 scope_id,
364 replay_key,
365 envelope_hash,
366 lease_token,
367 due_at_ms,
368 })))
369 }
370 other => Ok(Err(RuntimeEffectControllerError::new(
371 "sqlite_effect_replay_corrupt_row",
372 format!("unknown runtime effect replay status `{other}`"),
373 ))),
374 }
375 })
376 .await
377 .map_err(effect_sqlite_error)?;
378 outcome
379 }
380
381 async fn finalize_effect(
382 &self,
383 claim: &ClaimedEffect,
384 outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
385 ) -> Result<(), RuntimeEffectControllerError> {
386 let (status, outcome_json, error_json) = match outcome {
387 Ok(outcome) => (
388 STATUS_COMPLETED,
389 Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
390 None,
391 ),
392 Err(err) => (
393 STATUS_FAILED,
394 None,
395 Some(serde_json::to_string(err).map_err(effect_encode_error)?),
396 ),
397 };
398 let now = current_epoch_ms();
399 let scope_id = claim.scope_id.clone();
400 let replay_key = claim.replay_key.clone();
401 let envelope_hash = claim.envelope_hash.clone();
402 let owner_id = self.inner.owner_id.clone();
403 let lease_token = claim.lease_token.clone();
404 let status = status.to_string();
405
406 let result: Result<(), RuntimeEffectControllerError> = self
407 .inner
408 .conn
409 .write(move |tx| {
410 let changed = tx.execute(
411 "UPDATE runtime_effect_replay
412 SET status = ?6,
413 outcome_json = ?7,
414 error_json = ?8,
415 lease_owner_id = NULL,
416 lease_token = NULL,
417 lease_expires_at_ms = 0,
418 updated_at_ms = ?9
419 WHERE scope_id = ?1
420 AND replay_key = ?2
421 AND envelope_hash = ?3
422 AND lease_owner_id = ?4
423 AND lease_token = ?5
424 AND status = 'in_progress'
425 AND lease_expires_at_ms > ?10",
426 params![
427 scope_id.as_str(),
428 replay_key.as_str(),
429 envelope_hash.as_str(),
430 owner_id.as_str(),
431 lease_token.as_str(),
432 status.as_str(),
433 outcome_json,
434 error_json,
435 now as i64,
436 now as i64,
437 ],
438 )?;
439 if changed != 1 {
440 return Ok(Err(RuntimeEffectControllerError::new(
441 "sqlite_effect_replay_lease_lost",
442 format!(
443 "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
444 ),
445 )));
446 }
447 Ok(Ok(()))
448 })
449 .await
450 .map_err(effect_sqlite_error)?;
451 result
452 }
453
454 async fn renew_effect_lease(
455 &self,
456 claim: &ClaimedEffect,
457 ) -> Result<(), RuntimeEffectControllerError> {
458 let now = current_epoch_ms();
459 let renewed_expires_at = now.saturating_add(self.inner.lease_timings.ttl_ms());
460 let scope_id = claim.scope_id.clone();
461 let replay_key = claim.replay_key.clone();
462 let envelope_hash = claim.envelope_hash.clone();
463 let owner_id = self.inner.owner_id.clone();
464 let lease_token = claim.lease_token.clone();
465
466 let result: Result<(), RuntimeEffectControllerError> = self
467 .inner
468 .conn
469 .write(move |tx| {
470 let changed = tx.execute(
471 "UPDATE runtime_effect_replay
472 SET lease_expires_at_ms = ?6,
473 updated_at_ms = ?7
474 WHERE scope_id = ?1
475 AND replay_key = ?2
476 AND envelope_hash = ?3
477 AND lease_owner_id = ?4
478 AND lease_token = ?5
479 AND status = 'in_progress'
480 AND lease_expires_at_ms > ?8",
481 params![
482 scope_id.as_str(),
483 replay_key.as_str(),
484 envelope_hash.as_str(),
485 owner_id.as_str(),
486 lease_token.as_str(),
487 renewed_expires_at as i64,
488 now as i64,
489 now as i64,
490 ],
491 )?;
492 if changed != 1 {
493 return Ok(Err(RuntimeEffectControllerError::new(
494 "sqlite_effect_replay_lease_lost",
495 format!(
496 "runtime effect replay lease was lost while executing scope `{scope_id}` replay key `{replay_key}`"
497 ),
498 )));
499 }
500 Ok(Ok(()))
501 })
502 .await
503 .map_err(effect_sqlite_error)?;
504 result
505 }
506
507 async fn execute_claimed_effect_with_renewal(
508 &self,
509 claim: &ClaimedEffect,
510 envelope: RuntimeEffectEnvelope,
511 local_executor: RuntimeEffectLocalExecutor<'_>,
512 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
513 let renew_every = self.inner.lease_timings.renew_interval();
514 let effect = self.execute_claimed_effect(claim, envelope, local_executor);
515 tokio::pin!(effect);
516 let renew_sleep = tokio::time::sleep(renew_every);
517 tokio::pin!(renew_sleep);
518
519 loop {
520 tokio::select! {
521 result = &mut effect => return result,
522 _ = &mut renew_sleep => {
523 self.renew_effect_lease(claim).await?;
524 renew_sleep.as_mut().reset(tokio::time::Instant::now() + renew_every);
525 }
526 }
527 }
528 }
529
530 async fn execute_claimed_effect(
531 &self,
532 claim: &ClaimedEffect,
533 envelope: RuntimeEffectEnvelope,
534 local_executor: RuntimeEffectLocalExecutor<'_>,
535 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
536 if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
537 sleep_until_due(claim.due_at_ms).await;
538 return Ok(RuntimeEffectOutcome::Sleep);
539 }
540 match envelope.command {
541 RuntimeEffectCommand::Process { command } => {
542 let result = local_executor.into_process()?.execute(*command).await?;
543 Ok(RuntimeEffectOutcome::Process { result })
544 }
545 _ => local_executor.execute(envelope).await,
546 }
547 }
548}
549
550impl AwaitEventResolver for SqliteRuntimeEffectController {
551 fn durability_tier(&self) -> DurabilityTier {
552 DurabilityTier::Durable
553 }
554
555 fn requires_durable_attachment_store(&self) -> bool {
556 true
557 }
558
559 fn supports_durable_effects(&self) -> bool {
560 true
561 }
562}
563
564#[async_trait::async_trait]
565impl RuntimeEffectController for SqliteRuntimeEffectController {
566 async fn execute_effect(
567 &self,
568 envelope: RuntimeEffectEnvelope,
569 local_executor: RuntimeEffectLocalExecutor<'_>,
570 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
571 loop {
572 match self.prepare_effect(&envelope).await? {
573 PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
574 sleep_until_due(due_at_ms).await;
575 return Ok(*outcome);
576 }
577 PreparedEffect::ReplayError(err) => return Err(err),
578 PreparedEffect::Claimed(claim) => {
579 let result = self
580 .execute_claimed_effect_with_renewal(&claim, envelope, local_executor)
581 .await;
582 let finalize = self.finalize_effect(&claim, &result).await;
583 return match (result, finalize) {
584 (Ok(outcome), Ok(())) => Ok(outcome),
585 (Err(err), Ok(())) => Err(err),
586 (_, Err(err)) => Err(err),
587 };
588 }
589 PreparedEffect::Busy { retry_at_ms } => {
590 sleep_until_retry(retry_at_ms).await;
591 }
592 }
593 }
594 }
595}
596
597async fn open_effect_replay_inner(
598 path: &Path,
599 backing: StoreBacking,
600 options: SqliteEffectReplayOptions,
601) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
602 let conn = SqliteConnection::open(path).await?;
603 ensure_effect_schema(&conn).await?;
604 apply_pragmas(&conn, backing).await?;
605 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
606}
607
608async fn open_effect_replay_memory_inner(
609 options: SqliteEffectReplayOptions,
610) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
611 let conn = SqliteConnection::open_in_memory().await?;
612 ensure_effect_schema(&conn).await?;
613 apply_pragmas(&conn, StoreBacking::Memory).await?;
614 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
615}
616
617impl SqliteEffectReplayInner {
618 fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
619 let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
620 Self {
621 conn,
622 owner_id: format!(
623 "pid{}-{sequence}-{}",
624 std::process::id(),
625 current_epoch_ms()
626 ),
627 lease_counter: AtomicU64::new(1),
628 replay_mode: AtomicBool::new(false),
629 lease_timings: options.lease_timings,
630 }
631 }
632
633 fn next_lease_token(&self) -> String {
634 let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
635 format!("{}:{sequence}", self.owner_id)
636 }
637}
638
639fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
640 match envelope.command {
641 RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
642 _ => None,
643 }
644}
645
646async fn sleep_until_due(due_at_ms: Option<u64>) {
647 let Some(due_at_ms) = due_at_ms else {
648 return;
649 };
650 let now = current_epoch_ms();
651 if due_at_ms > now {
652 tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
653 }
654}
655
656async fn sleep_until_retry(retry_at_ms: u64) {
657 let now = current_epoch_ms();
658 let delay = if retry_at_ms > now {
659 Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
660 } else {
661 BUSY_POLL
662 };
663 tokio::time::sleep(delay).await;
664}
665
666fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
667 RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
668}
669
670fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
671 RuntimeEffectControllerError::new(
672 "sqlite_effect_replay_encode",
673 format!("failed to encode runtime effect replay row: {err}"),
674 )
675}
676
677fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
678 RuntimeEffectControllerError::new(
679 "sqlite_effect_replay_decode",
680 format!("failed to decode runtime effect replay row: {err}"),
681 )
682}