#[async_trait::async_trait]
impl ProcessRegistry for PostgresProcessRegistry {
fn durability_tier(&self) -> DurabilityTier {
DurabilityTier::Durable
}
async fn register_process(
&self,
registration: ProcessRegistration,
) -> Result<ProcessRecord, PluginError> {
let (registration, registration_hash) =
lash_core::runtime::prepare_process_registration(registration)?;
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
if let Some(existing) = load_process_tx(&mut tx, ®istration.id).await? {
if existing.registration_hash == registration_hash {
tx.commit().await.map_err(plugin_sqlx_error)?;
return Ok(existing);
}
return Err(PluginError::Session(format!(
"process `{}` registration hash conflict: existing {}, new {}",
registration.id, existing.registration_hash, registration_hash
)));
}
let now = current_epoch_ms();
let record =
ProcessRecord::from_prepared_registration(registration, registration_hash, now);
let record_json = serde_json::to_string(&record).map_err(process_decode_error)?;
sqlx::query(
"INSERT INTO lash_processes (
process_id, registration_hash, owner_scope_id,
created_at_ms, updated_at_ms, status, record_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(&record.id)
.bind(&record.registration_hash)
.bind(record.originator_scope_id().as_str())
.bind(record.created_at_ms as i64)
.bind(record.updated_at_ms as i64)
.bind(process_status_label(&record))
.bind(record_json)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
tx.commit().await.map_err(plugin_sqlx_error)?;
Ok(record)
}
async fn set_external_ref(
&self,
process_id: &str,
external_ref: ProcessExternalRef,
) -> Result<ProcessRecord, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let mut record = load_process_tx(&mut tx, process_id)
.await?
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
record.external_ref = Some(external_ref);
record.updated_at_ms = current_epoch_ms();
save_process_tx(&mut tx, &record).await?;
tx.commit().await.map_err(plugin_sqlx_error)?;
Ok(record)
}
async fn grant_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
descriptor: ProcessHandleDescriptor,
) -> Result<ProcessHandleGrant, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
if load_process_tx(&mut tx, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
sqlx::query(
"INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
VALUES ($1, $2, $3, $4)
ON CONFLICT (scope_id, process_id) DO UPDATE SET
session_id = EXCLUDED.session_id,
descriptor_json = EXCLUDED.descriptor_json",
)
.bind(&session_scope.session_id)
.bind(session_scope.id().as_str())
.bind(process_id)
.bind(serde_json::to_string(&descriptor).map_err(process_decode_error)?)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
tx.commit().await.map_err(plugin_sqlx_error)?;
Ok(ProcessHandleGrant {
session_id: session_scope.session_id.clone(),
process_id: process_id.to_string(),
descriptor,
})
}
async fn revoke_handle(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<(), PluginError> {
sqlx::query(
"DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
)
.bind(session_scope.id().as_str())
.bind(process_id)
.execute(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
Ok(())
}
async fn transfer_handle_grants(
&self,
from_scope: &SessionScope,
to_scope: &SessionScope,
process_ids: &[String],
) -> Result<(), PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
for process_id in process_ids {
let descriptor_json: Option<String> = sqlx::query_scalar(
"SELECT descriptor_json FROM lash_process_handle_grants
WHERE scope_id = $1 AND process_id = $2",
)
.bind(from_scope.id().as_str())
.bind(process_id)
.fetch_optional(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
let Some(descriptor_json) = descriptor_json else {
return Err(PluginError::Session(format!(
"process handle `{process_id}` is not granted to session `{}`",
from_scope.session_id
)));
};
sqlx::query(
"DELETE FROM lash_process_handle_grants WHERE scope_id = $1 AND process_id = $2",
)
.bind(from_scope.id().as_str())
.bind(process_id)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
sqlx::query(
"INSERT INTO lash_process_handle_grants (session_id, scope_id, process_id, descriptor_json)
VALUES ($1, $2, $3, $4)
ON CONFLICT (scope_id, process_id) DO UPDATE SET
session_id = EXCLUDED.session_id,
descriptor_json = EXCLUDED.descriptor_json",
)
.bind(&to_scope.session_id)
.bind(to_scope.id().as_str())
.bind(process_id)
.bind(descriptor_json)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
}
tx.commit().await.map_err(plugin_sqlx_error)
}
async fn list_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
list_grants_for_scope(&self.pool, session_scope, false).await
}
async fn list_live_handle_grants(
&self,
session_scope: &SessionScope,
) -> Result<Vec<ProcessHandleGrantEntry>, PluginError> {
list_grants_for_scope(&self.pool, session_scope, true).await
}
async fn has_handle_grant(
&self,
session_scope: &SessionScope,
process_id: &str,
) -> Result<bool, PluginError> {
let exists: Option<i64> = sqlx::query_scalar(
"SELECT 1::BIGINT FROM lash_process_handle_grants
WHERE scope_id = $1 AND process_id = $2
LIMIT 1",
)
.bind(session_scope.id().as_str())
.bind(process_id)
.fetch_optional(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
Ok(exists.is_some())
}
async fn handle_grants_for_process(
&self,
process_id: &str,
) -> Result<Vec<ProcessHandleGrant>, PluginError> {
if load_process(&self.pool, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let rows = sqlx::query(
"SELECT session_id, descriptor_json
FROM lash_process_handle_grants
WHERE process_id = $1
ORDER BY session_id ASC, scope_id ASC",
)
.bind(process_id)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let mut grants = Vec::new();
for row in rows {
let descriptor_json: String = row.get(1);
grants.push(ProcessHandleGrant {
session_id: row.get(0),
process_id: process_id.to_string(),
descriptor: serde_json::from_str(&descriptor_json).map_err(process_decode_error)?,
});
}
Ok(grants)
}
async fn delete_session_process_state(
&self,
session_id: &str,
) -> Result<lash_core::ProcessSessionDeleteReport, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let rows = sqlx::query(
"SELECT g.process_id, p.record_json
FROM lash_process_handle_grants g
JOIN lash_processes p ON p.process_id = g.process_id
WHERE g.session_id = $1
ORDER BY g.process_id ASC",
)
.bind(session_id)
.fetch_all(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
let mut removed = Vec::new();
for row in rows {
let process_id: String = row.get(0);
let record_json: String = row.get(1);
let record: ProcessRecord =
serde_json::from_str(&record_json).map_err(process_decode_error)?;
removed.push((process_id, record));
}
let deleted_wake_count = 0;
let revoked = sqlx::query("DELETE FROM lash_process_handle_grants WHERE session_id = $1")
.bind(session_id)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?
.rows_affected() as usize;
let mut orphaned_process_ids = Vec::new();
let mut preserved_process_ids = Vec::new();
for (process_id, record) in removed {
if record.is_terminal() {
continue;
}
let remaining: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM lash_process_handle_grants WHERE process_id = $1",
)
.bind(&process_id)
.fetch_one(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
if remaining == 0 {
orphaned_process_ids.push(process_id);
} else {
preserved_process_ids.push(process_id);
}
}
let rows = sqlx::query(
"SELECT process_id, record_json
FROM lash_processes
ORDER BY process_id ASC
FOR UPDATE",
)
.fetch_all(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
for row in rows {
let record_json: String = row.get(1);
let mut record: ProcessRecord =
serde_json::from_str(&record_json).map_err(process_decode_error)?;
if record.clear_wake_target_for_session(session_id) {
save_process_tx(&mut tx, &record).await?;
}
}
tx.commit().await.map_err(plugin_sqlx_error)?;
orphaned_process_ids.sort();
orphaned_process_ids.dedup();
preserved_process_ids.sort();
preserved_process_ids.dedup();
Ok(lash_core::ProcessSessionDeleteReport {
session_id: session_id.to_string(),
revoked_handle_count: revoked,
deleted_wake_count,
orphaned_process_ids,
preserved_process_ids,
})
}
async fn append_event(
&self,
process_id: &str,
request: ProcessEventAppendRequest,
) -> Result<ProcessEventAppendResult, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let mut record = load_process_tx(&mut tx, process_id)
.await?
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
let replay_lookup =
if let Some(replay_key) = request.replay.as_ref().map(|r| r.key.as_str()) {
load_event_by_key_tx(&mut tx, process_id, replay_key).await?
} else {
None
};
let sequence: i64 = sqlx::query_scalar(
"SELECT COALESCE(MAX(sequence), 0) + 1 FROM lash_process_events WHERE process_id = $1",
)
.bind(process_id)
.fetch_one(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
let occurred_at_ms = current_epoch_ms();
let prepared = lash_core::runtime::prepare_process_event_append(
&record,
request,
sequence as u64,
replay_lookup,
occurred_at_ms,
)?;
match prepared {
lash_core::ProcessEventAppendPlan::Replay {
event,
repair_status,
wake_delivery,
occurred_at_ms,
} => {
let repaired = if let Some(status) = repair_status {
lash_core::apply_process_status_projection(&mut record, status, occurred_at_ms);
save_process_tx(&mut tx, &record).await?;
true
} else {
false
};
tx.commit().await.map_err(plugin_sqlx_error)?;
if repaired {
self.notify.notify_waiters();
}
Ok(ProcessEventAppendResult {
event,
wake_delivery,
})
}
lash_core::ProcessEventAppendPlan::Insert {
event,
payload_hash,
status_update,
wake_delivery,
occurred_at_ms,
} => {
sqlx::query(
"INSERT INTO lash_process_events (
process_id, sequence, event_type, payload_hash, idempotency_key,
occurred_at_ms, event_json
)
VALUES ($1, $2, $3, $4, $5, $6, $7)",
)
.bind(process_id)
.bind(sequence)
.bind(event.event_type.as_str())
.bind(&payload_hash)
.bind(event.invocation.replay_key())
.bind(occurred_at_ms as i64)
.bind(serde_json::to_string(&event).map_err(process_decode_error)?)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
if let Some(status) = status_update {
lash_core::apply_process_status_projection(&mut record, status, occurred_at_ms);
} else {
record.updated_at_ms = occurred_at_ms;
}
save_process_tx(&mut tx, &record).await?;
tx.commit().await.map_err(plugin_sqlx_error)?;
self.notify.notify_waiters();
Ok(ProcessEventAppendResult {
event,
wake_delivery,
})
}
}
}
async fn events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
if load_process(&self.pool, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let rows = sqlx::query(
"SELECT event_json FROM lash_process_events
WHERE process_id = $1 AND sequence > $2
ORDER BY sequence ASC",
)
.bind(process_id)
.bind(after_sequence as i64)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let mut events = Vec::new();
for row in rows {
let json: String = row.get(0);
events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
}
Ok(events)
}
async fn count_events_through(
&self,
process_id: &str,
event_type: &str,
up_to_sequence: u64,
) -> Result<u64, PluginError> {
if load_process(&self.pool, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let row = sqlx::query(
"SELECT COUNT(*) FROM lash_process_events
WHERE process_id = $1 AND event_type = $2 AND sequence <= $3",
)
.bind(process_id)
.bind(event_type)
.bind(up_to_sequence as i64)
.fetch_one(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let count: i64 = row.get(0);
Ok(count as u64)
}
async fn recent_events(
&self,
process_id: &str,
limit: usize,
) -> Result<Vec<ProcessEvent>, PluginError> {
if load_process(&self.pool, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let rows = sqlx::query(
"SELECT event_json FROM lash_process_events
WHERE process_id = $1
ORDER BY sequence DESC
LIMIT $2",
)
.bind(process_id)
.bind(limit as i64)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let mut events = Vec::new();
for row in rows {
let json: String = row.get(0);
events.push(serde_json::from_str(&json).map_err(process_decode_error)?);
}
events.reverse();
Ok(events)
}
async fn wake_events_after(
&self,
process_id: &str,
after_sequence: u64,
) -> Result<Vec<ProcessEvent>, PluginError> {
let rows = sqlx::query("SELECT sequence FROM lash_process_wake_acks WHERE process_id = $1")
.bind(process_id)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let acked = rows
.into_iter()
.map(|row| row.get::<i64, _>(0) as u64)
.collect::<HashSet<_>>();
Ok(self
.events_after(process_id, after_sequence)
.await?
.into_iter()
.filter(|event| event.semantics.wake.is_some() && !acked.contains(&event.sequence))
.collect())
}
async fn wait_event_after(
&self,
process_id: &str,
event_type: &str,
after_sequence: u64,
) -> Result<ProcessEvent, PluginError> {
loop {
if let Some(event) = self
.events_after(process_id, after_sequence)
.await?
.into_iter()
.find(|event| event.event_type == event_type)
{
return Ok(event);
}
tokio::select! {
_ = self.notify.notified() => {}
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
}
}
}
async fn await_process(&self, process_id: &str) -> Result<ProcessAwaitOutput, PluginError> {
loop {
let record = load_process(&self.pool, process_id)
.await?
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
if let Some(await_output) = record.status.await_output() {
return Ok(await_output.clone());
}
tokio::select! {
_ = self.notify.notified() => {}
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
}
}
}
async fn complete_process(
&self,
process_id: &str,
await_output: ProcessAwaitOutput,
) -> Result<ProcessRecord, PluginError> {
let event_type = match await_output.terminal_state() {
lash_core::ProcessTerminalState::Completed => "process.completed",
lash_core::ProcessTerminalState::Failed => "process.failed",
lash_core::ProcessTerminalState::Cancelled => "process.cancelled",
};
self.append_event(
process_id,
ProcessEventAppendRequest::new(
event_type,
serde_json::json!({ "await_output": await_output }),
)
.with_replay_key(format!("process:{process_id}:terminal:{event_type}")),
)
.await?;
load_process(&self.pool, process_id).await?.ok_or_else(|| {
PluginError::Session(format!(
"unknown process `{process_id}` after terminal event"
))
})
}
async fn set_process_wait(
&self,
process_id: &str,
wait: lash_core::WaitState,
) -> Result<ProcessRecord, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let mut record = load_process_tx(&mut tx, process_id)
.await?
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
if record.is_terminal() {
return Err(PluginError::Session(format!(
"terminal process `{process_id}` cannot enter a wait state"
)));
}
record.wait = Some(wait);
record.updated_at_ms = current_epoch_ms();
save_process_tx(&mut tx, &record).await?;
tx.commit().await.map_err(plugin_sqlx_error)?;
self.notify.notify_waiters();
Ok(record)
}
async fn clear_process_wait(&self, process_id: &str) -> Result<ProcessRecord, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let mut record = load_process_tx(&mut tx, process_id)
.await?
.ok_or_else(|| PluginError::Session(format!("unknown process `{process_id}`")))?;
record.wait = None;
record.updated_at_ms = current_epoch_ms();
save_process_tx(&mut tx, &record).await?;
tx.commit().await.map_err(plugin_sqlx_error)?;
self.notify.notify_waiters();
Ok(record)
}
async fn get_process(&self, process_id: &str) -> Option<ProcessRecord> {
load_process(&self.pool, process_id).await.ok().flatten()
}
async fn list_processes(
&self,
filter: &lash_core::ProcessListFilter,
) -> Result<Vec<ProcessRecord>, PluginError> {
let rows = sqlx::query(
"SELECT record_json FROM lash_processes
ORDER BY process_id ASC",
)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let mut records = Vec::new();
for row in rows {
let json: String = row.get(0);
let record: ProcessRecord =
serde_json::from_str(&json).map_err(process_decode_error)?;
if filter.matches_record(&record) {
records.push(record);
}
}
Ok(records)
}
async fn ack_wake(&self, process_id: &str, sequence: u64) -> Result<(), PluginError> {
if load_process(&self.pool, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
sqlx::query(
"INSERT INTO lash_process_wake_acks (process_id, sequence)
VALUES ($1, $2)
ON CONFLICT DO NOTHING",
)
.bind(process_id)
.bind(sequence as i64)
.execute(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
Ok(())
}
async fn list_non_terminal(&self) -> Result<Vec<ProcessRecord>, PluginError> {
let rows = sqlx::query(
"SELECT record_json FROM lash_processes
WHERE status = 'running'
ORDER BY process_id ASC",
)
.fetch_all(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
let mut records = Vec::new();
for row in rows {
let json: String = row.get(0);
records.push(serde_json::from_str(&json).map_err(process_decode_error)?);
}
Ok(records)
}
async fn claim_process_lease(
&self,
process_id: &str,
owner_id: &str,
lease_ttl_ms: u64,
) -> Result<ProcessLease, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
if load_process_tx(&mut tx, process_id).await?.is_none() {
return Err(PluginError::Session(format!(
"unknown process `{process_id}`"
)));
}
let now = current_epoch_ms();
let current = load_process_lease_tx(&mut tx, process_id).await?;
if let Some(current) = current.as_ref()
&& current.expires_at_epoch_ms > now
&& current.owner_id != owner_id
{
return Err(process_lease_conflict(process_id, current));
}
let existing_fence: Option<i64> = sqlx::query_scalar(
"SELECT lease_fencing_token FROM lash_process_leases WHERE process_id = $1 FOR UPDATE",
)
.bind(process_id)
.fetch_optional(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
let fencing_token = existing_fence.unwrap_or(0) as u64 + 1;
let lease = ProcessLease {
schema_version: PROCESS_LEASE_SCHEMA_VERSION,
process_id: process_id.to_string(),
owner_id: owner_id.to_string(),
lease_token: format!(
"{:x}",
Sha256::digest(format!("{process_id}:{owner_id}:{now}:{fencing_token}").as_bytes())
),
fencing_token,
claimed_at_epoch_ms: now,
expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
};
sqlx::query(
"INSERT INTO lash_process_leases (
process_id, lease_owner_id, lease_token, lease_fencing_token,
lease_claimed_at_ms, lease_expires_at_ms
)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (process_id) DO UPDATE SET
lease_owner_id = EXCLUDED.lease_owner_id,
lease_token = EXCLUDED.lease_token,
lease_fencing_token = EXCLUDED.lease_fencing_token,
lease_claimed_at_ms = EXCLUDED.lease_claimed_at_ms,
lease_expires_at_ms = EXCLUDED.lease_expires_at_ms",
)
.bind(&lease.process_id)
.bind(&lease.owner_id)
.bind(&lease.lease_token)
.bind(lease.fencing_token as i64)
.bind(lease.claimed_at_epoch_ms as i64)
.bind(lease.expires_at_epoch_ms as i64)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
tx.commit().await.map_err(plugin_sqlx_error)?;
Ok(lease)
}
async fn renew_process_lease(
&self,
lease: &ProcessLease,
lease_ttl_ms: u64,
) -> Result<ProcessLease, PluginError> {
let mut tx = self.pool.begin().await.map_err(plugin_sqlx_error)?;
let now = current_epoch_ms();
let current = load_process_lease_tx(&mut tx, &lease.process_id).await?;
if !guard_lease(current.as_ref(), &lease.lease_token, now) {
return Err(process_lease_expired(&lease.process_id));
}
let renewed = ProcessLease {
expires_at_epoch_ms: now.saturating_add(lease_ttl_ms),
..lease.clone()
};
sqlx::query(
"UPDATE lash_process_leases
SET lease_expires_at_ms = $2
WHERE process_id = $1 AND lease_token = $3",
)
.bind(&renewed.process_id)
.bind(renewed.expires_at_epoch_ms as i64)
.bind(&renewed.lease_token)
.execute(&mut *tx)
.await
.map_err(plugin_sqlx_error)?;
tx.commit().await.map_err(plugin_sqlx_error)?;
Ok(renewed)
}
async fn complete_process_lease(
&self,
completion: &ProcessLeaseCompletion,
) -> Result<(), PluginError> {
sqlx::query(
"UPDATE lash_process_leases
SET lease_owner_id = NULL,
lease_token = NULL,
lease_claimed_at_ms = 0,
lease_expires_at_ms = 0
WHERE process_id = $1 AND lease_token = $2",
)
.bind(&completion.process_id)
.bind(&completion.lease_token)
.execute(&self.pool)
.await
.map_err(plugin_sqlx_error)?;
Ok(())
}
}