use super::*;
pub(crate) fn decode_delivery_policy(value: String) -> Result<DeliveryPolicy, StoreError> {
DeliveryPolicy::from_wire_str(&value).ok_or_else(|| {
StoreError::Backend(format!("unknown queued-work delivery policy `{value}`"))
})
}
pub(crate) fn decode_slot_policy(value: String) -> Result<SlotPolicy, StoreError> {
SlotPolicy::from_wire_str(&value)
.ok_or_else(|| StoreError::Backend(format!("unknown queued-work slot policy `{value}`")))
}
pub(crate) fn decode_merge_key(value: String) -> Result<MergeKey, StoreError> {
serde_json::from_str(&value).map_err(|err| {
StoreError::Backend(format!("failed to decode queued-work merge key: {err}"))
})
}
pub(crate) fn decode_queued_payload(value: String) -> Result<QueuedWorkPayload, StoreError> {
serde_json::from_str(&value)
.map_err(|err| StoreError::Backend(format!("failed to decode queued-work payload: {err}")))
}
pub(crate) fn queued_work_batch_from_conn(
conn: &Connection,
row: QueuedBatchRow,
) -> Result<QueuedWorkBatch, StoreError> {
let mut stmt = conn
.prepare(
"SELECT item_id, payload_json
FROM queued_work_items
WHERE batch_id = ?1
ORDER BY item_index ASC",
)
.map_err(sqlite_error)?;
let rows = stmt
.query_map(params![row.batch_id.as_str()], |item_row| {
Ok((item_row.get::<_, String>(0)?, item_row.get::<_, String>(1)?))
})
.map_err(sqlite_error)?;
let mut items = Vec::new();
for item in rows {
let (item_id, payload_json) = item.map_err(sqlite_error)?;
items.push(QueuedWorkItem {
item_id,
payload: decode_queued_payload(payload_json)?,
});
}
Ok(QueuedWorkBatch {
batch_id: row.batch_id,
session_id: row.session_id,
enqueue_seq: row.enqueue_seq,
source_key: row.source_key,
delivery_policy: decode_delivery_policy(row.delivery_policy)?,
slot_policy: decode_slot_policy(row.slot_policy)?,
merge_key: decode_merge_key(row.merge_key_json)?,
available_at_ms: row.available_at_ms,
enqueued_at_ms: row.enqueued_at_ms,
items,
})
}
pub(crate) struct QueuedBatchRow {
pub(crate) enqueue_seq: u64,
pub(crate) batch_id: String,
pub(crate) session_id: String,
pub(crate) source_key: Option<String>,
pub(crate) delivery_policy: String,
pub(crate) slot_policy: String,
pub(crate) merge_key_json: String,
pub(crate) available_at_ms: u64,
pub(crate) enqueued_at_ms: u64,
pub(crate) claim_fencing_token: u64,
}
pub(crate) fn queued_batch_row_from_sql(
row: &rusqlite::Row<'_>,
) -> rusqlite::Result<QueuedBatchRow> {
Ok(QueuedBatchRow {
enqueue_seq: row.get::<_, i64>(0)? as u64,
batch_id: row.get(1)?,
session_id: row.get(2)?,
source_key: row.get(3)?,
delivery_policy: row.get(4)?,
slot_policy: row.get(5)?,
merge_key_json: row.get(6)?,
available_at_ms: row.get::<_, i64>(7)? as u64,
enqueued_at_ms: row.get::<_, i64>(8)? as u64,
claim_fencing_token: row.get::<_, i64>(9)? as u64,
})
}
pub(crate) fn load_queued_batch_by_id_conn(
conn: &Connection,
batch_id: &str,
) -> Result<Option<QueuedWorkBatch>, StoreError> {
let row = conn
.query_row(
"SELECT enqueue_seq, batch_id, session_id, source_key, delivery_policy,
slot_policy, merge_key_json, available_at_ms, enqueued_at_ms,
claim_fencing_token
FROM queued_work_batches
WHERE batch_id = ?1",
params![batch_id],
queued_batch_row_from_sql,
)
.optional()
.map_err(sqlite_error)?;
row.map(|row| queued_work_batch_from_conn(conn, row))
.transpose()
}
pub(crate) fn ensure_queued_work_completion_conn(
conn: &Connection,
completed: &QueuedWorkCompletion,
) -> Result<(), StoreError> {
let mut stmt = conn
.prepare(
"SELECT COUNT(*)
FROM queued_work_batches
WHERE session_id = ?1
AND claim_id = ?2
AND claim_token = ?3",
)
.map_err(sqlite_error)?;
let count: usize = stmt
.query_row(
params![
completed.session_id,
completed.claim_id,
completed.lease_token
],
|row| row.get::<_, i64>(0),
)
.map_err(sqlite_error)? as usize;
if count != completed.batch_ids.len() {
return Err(StoreError::QueuedWorkClaimExpired {
session_id: completed.session_id.clone(),
claim_id: completed.claim_id.clone(),
});
}
Ok(())
}