lash-sqlite-store 0.1.0-alpha.72

SQLite-backed session store for the lash agent runtime.
Documentation
use super::*;

pub(crate) fn decode_delivery_policy(value: String) -> Result<DeliveryPolicy, StoreError> {
    DeliveryPolicy::from_wire_str(&value).ok_or_else(|| {
        StoreError::Backend(format!("unknown queued-work delivery policy `{value}`"))
    })
}

pub(crate) fn decode_slot_policy(value: String) -> Result<SlotPolicy, StoreError> {
    SlotPolicy::from_wire_str(&value)
        .ok_or_else(|| StoreError::Backend(format!("unknown queued-work slot policy `{value}`")))
}

pub(crate) fn decode_merge_key(value: String) -> Result<MergeKey, StoreError> {
    serde_json::from_str(&value).map_err(|err| {
        StoreError::Backend(format!("failed to decode queued-work merge key: {err}"))
    })
}

pub(crate) fn decode_queued_payload(value: String) -> Result<QueuedWorkPayload, StoreError> {
    serde_json::from_str(&value)
        .map_err(|err| StoreError::Backend(format!("failed to decode queued-work payload: {err}")))
}

fn lease_owner_from_columns(
    owner_id: Option<String>,
    incarnation_id: Option<String>,
    liveness_json: Option<String>,
) -> Option<LeaseOwnerIdentity> {
    owner_id.map(|owner_id| LeaseOwnerIdentity {
        incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
        owner_id,
        liveness: liveness_json
            .as_deref()
            .and_then(|json| serde_json::from_str(json).ok())
            .unwrap_or(LeaseOwnerLiveness::Opaque),
    })
}

pub(crate) fn queued_work_batch_from_conn(
    conn: &Connection,
    row: QueuedBatchRow,
) -> Result<QueuedWorkBatch, StoreError> {
    let mut stmt = conn
        .prepare(
            "SELECT item_id, payload_json
             FROM queued_work_items
             WHERE batch_id = ?1
             ORDER BY item_index ASC",
        )
        .map_err(sqlite_error)?;
    let rows = stmt
        .query_map(params![row.batch_id.as_str()], |item_row| {
            Ok((item_row.get::<_, String>(0)?, item_row.get::<_, String>(1)?))
        })
        .map_err(sqlite_error)?;
    let mut items = Vec::new();
    for item in rows {
        let (item_id, payload_json) = item.map_err(sqlite_error)?;
        items.push(QueuedWorkItem {
            item_id,
            payload: decode_queued_payload(payload_json)?,
        });
    }
    Ok(QueuedWorkBatch {
        batch_id: row.batch_id,
        session_id: row.session_id,
        enqueue_seq: row.enqueue_seq,
        source_key: row.source_key,
        delivery_policy: decode_delivery_policy(row.delivery_policy)?,
        slot_policy: decode_slot_policy(row.slot_policy)?,
        merge_key: decode_merge_key(row.merge_key_json)?,
        available_at_ms: row.available_at_ms,
        enqueued_at_ms: row.enqueued_at_ms,
        items,
    })
}

pub(crate) struct QueuedBatchRow {
    pub(crate) enqueue_seq: u64,
    pub(crate) batch_id: String,
    pub(crate) session_id: String,
    pub(crate) source_key: Option<String>,
    pub(crate) delivery_policy: String,
    pub(crate) slot_policy: String,
    pub(crate) merge_key_json: String,
    pub(crate) available_at_ms: u64,
    pub(crate) enqueued_at_ms: u64,
    pub(crate) claim_fencing_token: u64,
    pub(crate) claim_owner: Option<LeaseOwnerIdentity>,
    pub(crate) claim_token: Option<String>,
    pub(crate) claim_expires_at_ms: u64,
}

pub(crate) fn queued_batch_row_from_sql(
    row: &rusqlite::Row<'_>,
) -> rusqlite::Result<QueuedBatchRow> {
    Ok(QueuedBatchRow {
        enqueue_seq: row.get::<_, i64>(0)? as u64,
        batch_id: row.get(1)?,
        session_id: row.get(2)?,
        source_key: row.get(3)?,
        delivery_policy: row.get(4)?,
        slot_policy: row.get(5)?,
        merge_key_json: row.get(6)?,
        available_at_ms: row.get::<_, i64>(7)? as u64,
        enqueued_at_ms: row.get::<_, i64>(8)? as u64,
        claim_fencing_token: row.get::<_, i64>(9)? as u64,
        claim_owner: lease_owner_from_columns(row.get(10)?, row.get(11)?, row.get(12)?),
        claim_token: row.get(13)?,
        claim_expires_at_ms: row.get::<_, i64>(14)? as u64,
    })
}

pub(crate) fn load_queued_batch_by_id_conn(
    conn: &Connection,
    batch_id: &str,
) -> Result<Option<QueuedWorkBatch>, StoreError> {
    let row = conn
        .query_row(
            "SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
                    slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
                    claim_fencing_token, claim_owner_id, claim_owner_incarnation_id,
                    claim_owner_liveness_json, claim_token, claim_expires_at_ms
             FROM queued_work_batches
             WHERE batch_id = ?1",
            params![batch_id],
            queued_batch_row_from_sql,
        )
        .optional()
        .map_err(sqlite_error)?;
    row.map(|row| queued_work_batch_from_conn(conn, row))
        .transpose()
}

pub(crate) fn ensure_queued_work_completion_conn(
    conn: &Connection,
    completed: &QueuedWorkCompletion,
) -> Result<(), StoreError> {
    let mut stmt = conn
        .prepare(
            "SELECT COUNT(*)
             FROM queued_work_batches
             WHERE session_id = ?1
               AND claim_id = ?2
               AND claim_token = ?3",
        )
        .map_err(sqlite_error)?;
    let count: usize = stmt
        .query_row(
            params![
                completed.session_id,
                completed.claim_id,
                completed.lease_token
            ],
            |row| row.get::<_, i64>(0),
        )
        .map_err(sqlite_error)? as usize;
    ensure_completion_owns_all_batches(completed, count)
}