1use std::path::Path;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
16use std::time::Duration;
17
18use lash_core::{
19 DurabilityTier, EffectHost, EffectScope, PluginError, ProcessCommand, ProcessEffectOutcome,
20 RuntimeEffectCommand, RuntimeEffectController, RuntimeEffectControllerError,
21 RuntimeEffectEnvelope, RuntimeEffectLocalExecutor, RuntimeEffectOutcome, RuntimeError,
22 ScopedEffectController,
23};
24
25use super::*;
26
27const STATUS_IN_PROGRESS: &str = "in_progress";
28const STATUS_COMPLETED: &str = "completed";
29const STATUS_FAILED: &str = "failed";
30const DEFAULT_LEASE_TTL: Duration = Duration::from_secs(30);
31const BUSY_POLL: Duration = Duration::from_millis(25);
32
33static EFFECT_OWNER_COUNTER: AtomicU64 = AtomicU64::new(1);
34
35#[derive(Clone, Debug)]
37pub struct SqliteEffectReplayOptions {
38 pub lease_ttl: Duration,
39}
40
41impl Default for SqliteEffectReplayOptions {
42 fn default() -> Self {
43 Self {
44 lease_ttl: DEFAULT_LEASE_TTL,
45 }
46 }
47}
48
49struct SqliteEffectReplayInner {
50 conn: SqliteConnection,
51 owner_id: String,
52 lease_counter: AtomicU64,
53 replay_mode: AtomicBool,
54 lease_ttl_ms: u64,
55}
56
57#[derive(Clone)]
63pub struct SqliteEffectHost {
64 inner: Arc<SqliteEffectReplayInner>,
65}
66
67#[derive(Clone)]
69pub struct SqliteRuntimeEffectController {
70 inner: Arc<SqliteEffectReplayInner>,
71 scope: EffectScope,
72}
73
74struct ClaimedEffect {
75 scope_id: String,
76 replay_key: String,
77 envelope_hash: String,
78 lease_token: String,
79 due_at_ms: Option<u64>,
80}
81
82enum PreparedEffect {
83 ReplayOutcome {
84 outcome: Box<RuntimeEffectOutcome>,
85 due_at_ms: Option<u64>,
86 },
87 ReplayError(RuntimeEffectControllerError),
88 Claimed(ClaimedEffect),
89 Busy {
90 retry_at_ms: u64,
91 },
92}
93
94impl SqliteEffectHost {
95 pub async fn open(path: &Path) -> tokio_rusqlite::Result<Self> {
96 Self::open_with_options(path, SqliteEffectReplayOptions::default()).await
97 }
98
99 pub async fn open_with_options(
100 path: &Path,
101 options: SqliteEffectReplayOptions,
102 ) -> tokio_rusqlite::Result<Self> {
103 Ok(Self {
104 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
105 })
106 }
107
108 pub async fn memory() -> tokio_rusqlite::Result<Self> {
109 Self::memory_with_options(SqliteEffectReplayOptions::default()).await
110 }
111
112 pub async fn memory_with_options(
113 options: SqliteEffectReplayOptions,
114 ) -> tokio_rusqlite::Result<Self> {
115 Ok(Self {
116 inner: open_effect_replay_memory_inner(options).await?,
117 })
118 }
119
120 pub fn start_replay(&self) {
123 self.inner.replay_mode.store(true, Ordering::SeqCst);
124 }
125}
126
127impl EffectHost for SqliteEffectHost {
128 fn durability_tier(&self) -> DurabilityTier {
129 DurabilityTier::Durable
130 }
131
132 fn requires_durable_attachment_store(&self) -> bool {
133 true
134 }
135
136 fn scoped<'run>(
137 &'run self,
138 scope: EffectScope,
139 ) -> Result<ScopedEffectController<'run>, RuntimeError> {
140 let controller = SqliteRuntimeEffectController {
141 inner: Arc::clone(&self.inner),
142 scope: scope.clone(),
143 };
144 ScopedEffectController::shared(Arc::new(controller), scope)
145 }
146
147 fn scoped_static(
148 &self,
149 scope: EffectScope,
150 ) -> Result<Option<ScopedEffectController<'static>>, RuntimeError> {
151 let controller = SqliteRuntimeEffectController {
152 inner: Arc::clone(&self.inner),
153 scope: scope.clone(),
154 };
155 Ok(Some(ScopedEffectController::shared(
156 Arc::new(controller),
157 scope,
158 )?))
159 }
160}
161
162impl SqliteRuntimeEffectController {
163 pub async fn open(path: &Path, scope: EffectScope) -> tokio_rusqlite::Result<Self> {
164 Self::open_with_options(path, scope, SqliteEffectReplayOptions::default()).await
165 }
166
167 pub async fn open_with_options(
168 path: &Path,
169 scope: EffectScope,
170 options: SqliteEffectReplayOptions,
171 ) -> tokio_rusqlite::Result<Self> {
172 Ok(Self {
173 inner: open_effect_replay_inner(path, StoreBacking::File, options).await?,
174 scope,
175 })
176 }
177
178 pub async fn memory(scope: EffectScope) -> tokio_rusqlite::Result<Self> {
179 Self::memory_with_options(scope, SqliteEffectReplayOptions::default()).await
180 }
181
182 pub async fn memory_with_options(
183 scope: EffectScope,
184 options: SqliteEffectReplayOptions,
185 ) -> tokio_rusqlite::Result<Self> {
186 Ok(Self {
187 inner: open_effect_replay_memory_inner(options).await?,
188 scope,
189 })
190 }
191
192 pub fn start_replay(&self) {
195 self.inner.replay_mode.store(true, Ordering::SeqCst);
196 }
197
198 async fn prepare_effect(
199 &self,
200 envelope: &RuntimeEffectEnvelope,
201 ) -> Result<PreparedEffect, RuntimeEffectControllerError> {
202 let replay_key = envelope
203 .invocation
204 .replay_key()
205 .ok_or_else(|| {
206 RuntimeEffectControllerError::new(
207 "sqlite_effect_replay_key_missing",
208 "runtime effect envelope requires replay.key",
209 )
210 })?
211 .to_string();
212 let envelope_hash = envelope.stable_hash()?;
213 let scope_id = self.scope.id().to_string();
214 let now = current_epoch_ms();
215 let lease_token = self.inner.next_lease_token();
216 let due_at_ms = sleep_due_at_ms(envelope, now);
217 let lease_expires_at_ms = now.saturating_add(self.inner.lease_ttl_ms);
218 let replay_mode = self.inner.replay_mode.load(Ordering::SeqCst);
219 let owner_id = self.inner.owner_id.clone();
220 let lease_ttl_ms = self.inner.lease_ttl_ms;
221
222 let outcome: Result<PreparedEffect, RuntimeEffectControllerError> = self
230 .inner
231 .conn
232 .write(move |tx| {
233 let row = tx
234 .query_row(
235 "SELECT envelope_hash, status, outcome_json, error_json,
236 lease_owner_id, lease_token, lease_expires_at_ms, due_at_ms
237 FROM runtime_effect_replay
238 WHERE scope_id = ?1 AND replay_key = ?2",
239 params![scope_id.as_str(), replay_key.as_str()],
240 |row| {
241 Ok((
242 row.get::<_, String>(0)?,
243 row.get::<_, String>(1)?,
244 row.get::<_, Option<String>>(2)?,
245 row.get::<_, Option<String>>(3)?,
246 row.get::<_, i64>(6)?,
247 row.get::<_, Option<i64>>(7)?,
248 ))
249 },
250 )
251 .optional()?;
252
253 let Some((
254 existing_hash,
255 status,
256 outcome_json,
257 error_json,
258 lease_expires_row,
259 existing_due_row,
260 )) = row
261 else {
262 if replay_mode {
263 return Ok(Err(RuntimeEffectControllerError::new(
264 "sqlite_effect_replay_missing",
265 format!(
266 "no recorded runtime effect for scope `{scope_id}` and replay key `{replay_key}`"
267 ),
268 )));
269 }
270 let due_at_param = due_at_ms.map(|value| value as i64);
271 tx.execute(
272 "INSERT INTO runtime_effect_replay (
273 scope_id, replay_key, envelope_hash, status, outcome_json,
274 error_json, lease_owner_id, lease_token, lease_expires_at_ms,
275 due_at_ms, created_at_ms, updated_at_ms
276 )
277 VALUES (?1, ?2, ?3, ?4, NULL, NULL, ?5, ?6, ?7, ?8, ?9, ?10)",
278 params![
279 scope_id.as_str(),
280 replay_key.as_str(),
281 envelope_hash.as_str(),
282 STATUS_IN_PROGRESS,
283 owner_id.as_str(),
284 lease_token.as_str(),
285 lease_expires_at_ms as i64,
286 due_at_param,
287 now as i64,
288 now as i64,
289 ],
290 )?;
291 return Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
292 scope_id,
293 replay_key,
294 envelope_hash,
295 lease_token,
296 due_at_ms,
297 })));
298 };
299
300 if existing_hash != envelope_hash {
301 return Ok(Err(RuntimeEffectControllerError::new(
302 "sqlite_effect_replay_hash_conflict",
303 format!(
304 "runtime effect replay key `{replay_key}` in scope `{scope_id}` was reused with a different envelope hash"
305 ),
306 )));
307 }
308
309 let lease_expires_at_ms = lease_expires_row as u64;
310 let existing_due_at_ms = existing_due_row.map(|value| value as u64);
311
312 match status.as_str() {
313 STATUS_COMPLETED => {
314 let Some(json) = outcome_json else {
315 return Ok(Err(RuntimeEffectControllerError::new(
316 "sqlite_effect_replay_corrupt_row",
317 "completed runtime effect row is missing outcome_json",
318 )));
319 };
320 let outcome = match serde_json::from_str(&json) {
321 Ok(outcome) => outcome,
322 Err(err) => return Ok(Err(effect_decode_error(err))),
323 };
324 Ok(Ok(PreparedEffect::ReplayOutcome {
325 outcome: Box::new(outcome),
326 due_at_ms: existing_due_at_ms,
327 }))
328 }
329 STATUS_FAILED => {
330 let Some(json) = error_json else {
331 return Ok(Err(RuntimeEffectControllerError::new(
332 "sqlite_effect_replay_corrupt_row",
333 "failed runtime effect row is missing error_json",
334 )));
335 };
336 let err = match serde_json::from_str(&json) {
337 Ok(err) => err,
338 Err(err) => return Ok(Err(effect_decode_error(err))),
339 };
340 Ok(Ok(PreparedEffect::ReplayError(err)))
341 }
342 STATUS_IN_PROGRESS if lease_expires_at_ms > now => Ok(Ok(PreparedEffect::Busy {
343 retry_at_ms: lease_expires_at_ms,
344 })),
345 STATUS_IN_PROGRESS => {
346 let due_at_ms = existing_due_at_ms.or(due_at_ms);
347 let due_at_param = due_at_ms.map(|value| value as i64);
348 tx.execute(
349 "UPDATE runtime_effect_replay
350 SET lease_owner_id = ?3,
351 lease_token = ?4,
352 lease_expires_at_ms = ?5,
353 due_at_ms = ?6,
354 updated_at_ms = ?7
355 WHERE scope_id = ?1 AND replay_key = ?2",
356 params![
357 scope_id.as_str(),
358 replay_key.as_str(),
359 owner_id.as_str(),
360 lease_token.as_str(),
361 current_epoch_ms().saturating_add(lease_ttl_ms) as i64,
362 due_at_param,
363 current_epoch_ms() as i64,
364 ],
365 )?;
366 Ok(Ok(PreparedEffect::Claimed(ClaimedEffect {
367 scope_id,
368 replay_key,
369 envelope_hash,
370 lease_token,
371 due_at_ms,
372 })))
373 }
374 other => Ok(Err(RuntimeEffectControllerError::new(
375 "sqlite_effect_replay_corrupt_row",
376 format!("unknown runtime effect replay status `{other}`"),
377 ))),
378 }
379 })
380 .await
381 .map_err(effect_sqlite_error)?;
382 outcome
383 }
384
385 async fn finalize_effect(
386 &self,
387 claim: &ClaimedEffect,
388 outcome: &Result<RuntimeEffectOutcome, RuntimeEffectControllerError>,
389 ) -> Result<(), RuntimeEffectControllerError> {
390 let (status, outcome_json, error_json) = match outcome {
391 Ok(outcome) => (
392 STATUS_COMPLETED,
393 Some(serde_json::to_string(outcome).map_err(effect_encode_error)?),
394 None,
395 ),
396 Err(err) => (
397 STATUS_FAILED,
398 None,
399 Some(serde_json::to_string(err).map_err(effect_encode_error)?),
400 ),
401 };
402 let now = current_epoch_ms();
403 let scope_id = claim.scope_id.clone();
404 let replay_key = claim.replay_key.clone();
405 let envelope_hash = claim.envelope_hash.clone();
406 let lease_token = claim.lease_token.clone();
407 let status = status.to_string();
408
409 let result: Result<(), RuntimeEffectControllerError> = self
410 .inner
411 .conn
412 .write(move |tx| {
413 let changed = tx.execute(
414 "UPDATE runtime_effect_replay
415 SET status = ?5,
416 outcome_json = ?6,
417 error_json = ?7,
418 lease_owner_id = NULL,
419 lease_token = NULL,
420 lease_expires_at_ms = 0,
421 updated_at_ms = ?8
422 WHERE scope_id = ?1
423 AND replay_key = ?2
424 AND envelope_hash = ?3
425 AND lease_token = ?4
426 AND status = 'in_progress'",
427 params![
428 scope_id.as_str(),
429 replay_key.as_str(),
430 envelope_hash.as_str(),
431 lease_token.as_str(),
432 status.as_str(),
433 outcome_json,
434 error_json,
435 now as i64,
436 ],
437 )?;
438 if changed != 1 {
439 return Ok(Err(RuntimeEffectControllerError::new(
440 "sqlite_effect_replay_lease_lost",
441 format!(
442 "runtime effect replay lease was lost before finalizing scope `{scope_id}` replay key `{replay_key}`"
443 ),
444 )));
445 }
446 Ok(Ok(()))
447 })
448 .await
449 .map_err(effect_sqlite_error)?;
450 result
451 }
452
453 async fn execute_claimed_effect(
454 &self,
455 claim: &ClaimedEffect,
456 envelope: RuntimeEffectEnvelope,
457 local_executor: RuntimeEffectLocalExecutor<'_>,
458 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
459 if matches!(envelope.command, RuntimeEffectCommand::Sleep { .. }) {
460 sleep_until_due(claim.due_at_ms).await;
461 return Ok(RuntimeEffectOutcome::Sleep);
462 }
463 match envelope.command {
464 RuntimeEffectCommand::Process { command } => {
465 let result = self
466 .execute_process_command(command, local_executor)
467 .await?;
468 Ok(RuntimeEffectOutcome::Process { result })
469 }
470 _ => local_executor.execute(envelope).await,
471 }
472 }
473
474 async fn execute_process_command(
475 &self,
476 command: ProcessCommand,
477 local_executor: RuntimeEffectLocalExecutor<'_>,
478 ) -> Result<ProcessEffectOutcome, RuntimeEffectControllerError> {
479 let execution = local_executor.into_process()?;
480 let registry = execution.registry;
481 match command {
482 ProcessCommand::Start {
483 registration,
484 grant,
485 execution_context: _,
486 } => {
487 let registration_id = registration.id.clone();
488 let record = registry.register_process(registration).await?;
489 if let Some(grant) = grant {
490 registry
491 .grant_handle(&grant.owner_scope, ®istration_id, grant.descriptor)
492 .await?;
493 }
494 Ok(ProcessEffectOutcome::Start { record })
495 }
496 ProcessCommand::List { owner_scope, mode } => {
497 let entries = match mode {
498 lash_core::ProcessListMode::Live => {
499 registry.list_live_handle_grants(&owner_scope).await?
500 }
501 lash_core::ProcessListMode::All => {
502 registry.list_handle_grants(&owner_scope).await?
503 }
504 };
505 Ok(ProcessEffectOutcome::List { entries })
506 }
507 ProcessCommand::Transfer {
508 from_scope,
509 to_scope,
510 process_ids,
511 } => {
512 registry
513 .transfer_handle_grants(&from_scope, &to_scope, &process_ids)
514 .await?;
515 Ok(ProcessEffectOutcome::Transfer)
516 }
517 ProcessCommand::DeleteSession { session_id } => {
518 let report = registry.delete_session_process_state(&session_id).await?;
519 for process_id in &report.cancel_process_ids {
520 registry
521 .append_event(
522 process_id,
523 lash_core::ProcessEventAppendRequest::cancel_requested(
524 process_id,
525 Some("session deleted".to_string()),
526 ),
527 )
528 .await?;
529 }
530 Ok(ProcessEffectOutcome::DeleteSession { report })
531 }
532 ProcessCommand::Await { process_id } => {
533 let output = registry.await_process(&process_id).await?;
534 Ok(ProcessEffectOutcome::Await { output })
535 }
536 ProcessCommand::Cancel { process_id, reason } => {
537 registry
538 .append_event(
539 &process_id,
540 lash_core::ProcessEventAppendRequest::cancel_requested(&process_id, reason),
541 )
542 .await?;
543 let record = registry.get_process(&process_id).await.ok_or_else(|| {
544 PluginError::Session(format!("unknown process `{process_id}`"))
545 })?;
546 Ok(ProcessEffectOutcome::Cancel { record })
547 }
548 ProcessCommand::Signal {
549 process_id,
550 request,
551 ..
552 } => {
553 let result = registry.append_event(&process_id, request).await?;
554 Ok(ProcessEffectOutcome::Signal {
555 event: result.event,
556 })
557 }
558 }
559 }
560}
561
562#[async_trait::async_trait]
563impl RuntimeEffectController for SqliteRuntimeEffectController {
564 fn durability_tier(&self) -> DurabilityTier {
565 DurabilityTier::Durable
566 }
567
568 fn requires_durable_attachment_store(&self) -> bool {
569 true
570 }
571
572 async fn execute_effect(
573 &self,
574 envelope: RuntimeEffectEnvelope,
575 local_executor: RuntimeEffectLocalExecutor<'_>,
576 ) -> Result<RuntimeEffectOutcome, RuntimeEffectControllerError> {
577 loop {
578 match self.prepare_effect(&envelope).await? {
579 PreparedEffect::ReplayOutcome { outcome, due_at_ms } => {
580 sleep_until_due(due_at_ms).await;
581 return Ok(*outcome);
582 }
583 PreparedEffect::ReplayError(err) => return Err(err),
584 PreparedEffect::Claimed(claim) => {
585 let result = self
586 .execute_claimed_effect(&claim, envelope, local_executor)
587 .await;
588 let finalize = self.finalize_effect(&claim, &result).await;
589 return match (result, finalize) {
590 (Ok(outcome), Ok(())) => Ok(outcome),
591 (Err(err), Ok(())) => Err(err),
592 (_, Err(err)) => Err(err),
593 };
594 }
595 PreparedEffect::Busy { retry_at_ms } => {
596 sleep_until_retry(retry_at_ms).await;
597 }
598 }
599 }
600 }
601}
602
603async fn open_effect_replay_inner(
604 path: &Path,
605 backing: StoreBacking,
606 options: SqliteEffectReplayOptions,
607) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
608 let conn = SqliteConnection::open(path).await?;
609 ensure_effect_schema(&conn).await?;
610 apply_pragmas(&conn, backing).await?;
611 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
612}
613
614async fn open_effect_replay_memory_inner(
615 options: SqliteEffectReplayOptions,
616) -> tokio_rusqlite::Result<Arc<SqliteEffectReplayInner>> {
617 let conn = SqliteConnection::open_in_memory().await?;
618 ensure_effect_schema(&conn).await?;
619 apply_pragmas(&conn, StoreBacking::Memory).await?;
620 Ok(Arc::new(SqliteEffectReplayInner::new(conn, options)))
621}
622
623impl SqliteEffectReplayInner {
624 fn new(conn: SqliteConnection, options: SqliteEffectReplayOptions) -> Self {
625 let sequence = EFFECT_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
626 Self {
627 conn,
628 owner_id: format!(
629 "pid{}-{sequence}-{}",
630 std::process::id(),
631 current_epoch_ms()
632 ),
633 lease_counter: AtomicU64::new(1),
634 replay_mode: AtomicBool::new(false),
635 lease_ttl_ms: duration_ms(options.lease_ttl),
636 }
637 }
638
639 fn next_lease_token(&self) -> String {
640 let sequence = self.lease_counter.fetch_add(1, Ordering::SeqCst);
641 format!("{}:{sequence}", self.owner_id)
642 }
643}
644
645fn duration_ms(duration: Duration) -> u64 {
646 let millis = duration.as_millis();
647 if millis == 0 {
648 1
649 } else {
650 millis.min(u128::from(u64::MAX)) as u64
651 }
652}
653
654fn sleep_due_at_ms(envelope: &RuntimeEffectEnvelope, now: u64) -> Option<u64> {
655 match envelope.command {
656 RuntimeEffectCommand::Sleep { duration_ms } => Some(now.saturating_add(duration_ms)),
657 _ => None,
658 }
659}
660
661async fn sleep_until_due(due_at_ms: Option<u64>) {
662 let Some(due_at_ms) = due_at_ms else {
663 return;
664 };
665 let now = current_epoch_ms();
666 if due_at_ms > now {
667 tokio::time::sleep(Duration::from_millis(due_at_ms - now)).await;
668 }
669}
670
671async fn sleep_until_retry(retry_at_ms: u64) {
672 let now = current_epoch_ms();
673 let delay = if retry_at_ms > now {
674 Duration::from_millis(retry_at_ms - now).min(BUSY_POLL)
675 } else {
676 BUSY_POLL
677 };
678 tokio::time::sleep(delay).await;
679}
680
681fn effect_sqlite_error(err: rusqlite::Error) -> RuntimeEffectControllerError {
682 RuntimeEffectControllerError::new("sqlite_effect_replay_store", err.to_string())
683}
684
685fn effect_encode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
686 RuntimeEffectControllerError::new(
687 "sqlite_effect_replay_encode",
688 format!("failed to encode runtime effect replay row: {err}"),
689 )
690}
691
692fn effect_decode_error(err: serde_json::Error) -> RuntimeEffectControllerError {
693 RuntimeEffectControllerError::new(
694 "sqlite_effect_replay_decode",
695 format!("failed to decode runtime effect replay row: {err}"),
696 )
697}