lash-sqlite-store 0.1.0-alpha.85

SQLite-backed session store for the lash agent runtime.
Documentation
use super::*;

fn lease_owner_from_columns(
    owner_id: Option<String>,
    incarnation_id: Option<String>,
    liveness_json: Option<String>,
) -> Option<LeaseOwnerIdentity> {
    owner_id.map(|owner_id| LeaseOwnerIdentity {
        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
        owner_id,
        liveness: liveness_json
            .as_deref()
            .and_then(|json| serde_json::from_str(json).ok())
            .unwrap_or(LeaseOwnerLiveness::Opaque),
    })
}

pub(crate) fn decode_turn_input_ingress(
    value: String,
) -> Result<lash_core::TurnInputIngress, StoreError> {
    serde_json::from_str(&value)
        .map_err(|err| StoreError::Backend(format!("failed to decode turn-input ingress: {err}")))
}

pub(crate) fn decode_turn_input_state(
    value: String,
) -> Result<lash_core::TurnInputState, StoreError> {
    lash_core::TurnInputState::from_wire_str(&value)
        .ok_or_else(|| StoreError::Backend(format!("unknown turn-input state `{value}`")))
}

pub(crate) fn decode_turn_input(value: String) -> Result<lash_core::TurnInput, StoreError> {
    serde_json::from_str(&value)
        .map_err(|err| StoreError::Backend(format!("failed to decode turn input: {err}")))
}

#[derive(Clone, Debug)]
pub(crate) struct PendingTurnInputRow {
    pub(crate) enqueue_seq: u64,
    pub(crate) input_id: String,
    pub(crate) session_id: String,
    pub(crate) source_key: Option<String>,
    pub(crate) ingress_json: String,
    pub(crate) state: String,
    pub(crate) input_json: String,
    pub(crate) enqueued_at_ms: u64,
    pub(crate) claim_id: Option<String>,
    pub(crate) claim_fencing_token: u64,
    pub(crate) claim_owner: Option<LeaseOwnerIdentity>,
    pub(crate) claim_token: Option<String>,
    pub(crate) claim_expires_at_ms: u64,
}

pub(crate) fn pending_turn_input_row_from_sql(
    row: &rusqlite::Row<'_>,
) -> rusqlite::Result<PendingTurnInputRow> {
    Ok(PendingTurnInputRow {
        enqueue_seq: row.get::<_, i64>(0)? as u64,
        input_id: row.get(1)?,
        session_id: row.get(2)?,
        source_key: row.get(3)?,
        ingress_json: row.get(4)?,
        state: row.get(5)?,
        input_json: row.get(6)?,
        enqueued_at_ms: row.get::<_, i64>(7)? as u64,
        claim_id: row.get(8)?,
        claim_fencing_token: row.get::<_, i64>(9)? as u64,
        claim_owner: lease_owner_from_columns(row.get(10)?, row.get(11)?, row.get(12)?),
        claim_token: row.get(13)?,
        claim_expires_at_ms: row.get::<_, i64>(14)? as u64,
    })
}

pub(crate) fn pending_turn_input_from_row(
    row: PendingTurnInputRow,
) -> Result<lash_core::PendingTurnInput, StoreError> {
    Ok(lash_core::PendingTurnInput {
        input_id: row.input_id,
        session_id: row.session_id,
        enqueue_seq: row.enqueue_seq,
        source_key: row.source_key,
        ingress: decode_turn_input_ingress(row.ingress_json)?,
        state: decode_turn_input_state(row.state)?,
        enqueued_at_ms: row.enqueued_at_ms,
        input: decode_turn_input(row.input_json)?,
    })
}

pub(crate) fn load_pending_turn_input_by_id_conn(
    conn: &Connection,
    session_id: &str,
    input_id: &str,
) -> Result<Option<lash_core::PendingTurnInput>, StoreError> {
    let row = conn
        .query_row(
            "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
                    state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
                    claim_owner_id, claim_owner_incarnation_id,
                    claim_owner_liveness_json, claim_token, claim_expires_at_ms
             FROM pending_turn_inputs
             WHERE session_id = ?1 AND input_id = ?2",
            params![session_id, input_id],
            pending_turn_input_row_from_sql,
        )
        .optional()
        .map_err(sqlite_error)?;
    row.map(pending_turn_input_from_row).transpose()
}

pub(crate) fn load_pending_turn_input_row_by_target_conn(
    conn: &Connection,
    session_id: &str,
    target: &lash_core::PendingTurnInputCancelTarget,
) -> Result<Option<PendingTurnInputRow>, StoreError> {
    match target {
        lash_core::PendingTurnInputCancelTarget::InputId(input_id) => conn
            .query_row(
                "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
                        state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
                        claim_owner_id, claim_owner_incarnation_id,
                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
                 FROM pending_turn_inputs
                 WHERE session_id = ?1 AND input_id = ?2",
                params![session_id, input_id],
                pending_turn_input_row_from_sql,
            )
            .optional()
            .map_err(sqlite_error),
        lash_core::PendingTurnInputCancelTarget::SourceKey(source_key) => conn
            .query_row(
                "SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
                        state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
                        claim_owner_id, claim_owner_incarnation_id,
                        claim_owner_liveness_json, claim_token, claim_expires_at_ms
                 FROM pending_turn_inputs
                 WHERE session_id = ?1 AND source_key = ?2",
                params![session_id, source_key],
                pending_turn_input_row_from_sql,
            )
            .optional()
            .map_err(sqlite_error),
    }
}

pub(crate) fn pending_turn_input_claim_diagnostics_from_row(
    row: &PendingTurnInputRow,
    state: lash_core::TurnInputState,
) -> Option<lash_core::PendingTurnInputClaimDiagnostics> {
    (row.claim_token.is_some() || matches!(state, lash_core::TurnInputState::Accepted)).then(|| {
        lash_core::PendingTurnInputClaimDiagnostics {
            state,
            claim_id: row.claim_id.clone(),
            claim_owner: row.claim_owner.clone(),
            claim_expires_at_ms: row.claim_token.as_ref().map(|_| row.claim_expires_at_ms),
            claim_fencing_token: row.claim_fencing_token,
        }
    })
}

#[derive(Clone, Debug)]
pub(crate) struct TurnInputClaimLease {
    pub(crate) claim_id: String,
    pub(crate) lease_token: String,
    pub(crate) fencing_token: u64,
    pub(crate) claimed_at_epoch_ms: u64,
    pub(crate) expires_at_epoch_ms: u64,
}

impl TurnInputClaimLease {
    pub(crate) fn derive(
        head: &PendingTurnInputRow,
        session_id: &str,
        owner: &LeaseOwnerIdentity,
        now_epoch_ms: u64,
        lease_ttl_ms: u64,
    ) -> Self {
        let fencing_token = head.claim_fencing_token.saturating_add(1);
        let claim_id = format!("tic:{}:{fencing_token}", head.enqueue_seq);
        let lease_token = format!(
            "{:x}",
            Sha256::digest(
                format!(
                    "{}:{}:{}:{}:{}",
                    session_id, owner.owner_id, owner.incarnation_id, claim_id, now_epoch_ms
                )
                .as_bytes(),
            )
        );
        Self {
            claim_id,
            lease_token,
            fencing_token,
            claimed_at_epoch_ms: now_epoch_ms,
            expires_at_epoch_ms: now_epoch_ms.saturating_add(lease_ttl_ms),
        }
    }
}

pub(crate) fn ensure_turn_input_completion_owns_all_inputs(
    completed: &lash_core::TurnInputCompletion,
    owned_rows: usize,
) -> Result<(), StoreError> {
    if owned_rows != completed.input_ids.len() {
        return Err(StoreError::TurnInputClaimExpired {
            session_id: completed.session_id.clone(),
            claim_id: completed.claim_id.clone(),
        });
    }
    Ok(())
}