use super::*;
fn lease_owner_from_columns(
owner_id: Option<String>,
incarnation_id: Option<String>,
liveness_json: Option<String>,
) -> Option<LeaseOwnerIdentity> {
owner_id.map(|owner_id| LeaseOwnerIdentity {
incarnation_id: incarnation_id.unwrap_or_else(|| owner_id.clone()),
owner_id,
liveness: liveness_json
.as_deref()
.and_then(|json| serde_json::from_str(json).ok())
.unwrap_or(LeaseOwnerLiveness::Opaque),
})
}
pub(crate) fn decode_turn_input_ingress(
value: String,
) -> Result<lash_core::TurnInputIngress, StoreError> {
serde_json::from_str(&value)
.map_err(|err| StoreError::Backend(format!("failed to decode turn-input ingress: {err}")))
}
pub(crate) fn decode_turn_input_state(
value: String,
) -> Result<lash_core::TurnInputState, StoreError> {
lash_core::TurnInputState::from_wire_str(&value)
.ok_or_else(|| StoreError::Backend(format!("unknown turn-input state `{value}`")))
}
pub(crate) fn decode_turn_input(value: String) -> Result<lash_core::TurnInput, StoreError> {
serde_json::from_str(&value)
.map_err(|err| StoreError::Backend(format!("failed to decode turn input: {err}")))
}
#[derive(Clone, Debug)]
pub(crate) struct PendingTurnInputRow {
pub(crate) enqueue_seq: u64,
pub(crate) input_id: String,
pub(crate) session_id: String,
pub(crate) source_key: Option<String>,
pub(crate) ingress_json: String,
pub(crate) state: String,
pub(crate) input_json: String,
pub(crate) enqueued_at_ms: u64,
pub(crate) claim_id: Option<String>,
pub(crate) claim_fencing_token: u64,
pub(crate) claim_owner: Option<LeaseOwnerIdentity>,
pub(crate) claim_token: Option<String>,
pub(crate) claim_expires_at_ms: u64,
}
pub(crate) fn pending_turn_input_row_from_sql(
row: &rusqlite::Row<'_>,
) -> rusqlite::Result<PendingTurnInputRow> {
Ok(PendingTurnInputRow {
enqueue_seq: row.get::<_, i64>(0)? as u64,
input_id: row.get(1)?,
session_id: row.get(2)?,
source_key: row.get(3)?,
ingress_json: row.get(4)?,
state: row.get(5)?,
input_json: row.get(6)?,
enqueued_at_ms: row.get::<_, i64>(7)? as u64,
claim_id: row.get(8)?,
claim_fencing_token: row.get::<_, i64>(9)? as u64,
claim_owner: lease_owner_from_columns(row.get(10)?, row.get(11)?, row.get(12)?),
claim_token: row.get(13)?,
claim_expires_at_ms: row.get::<_, i64>(14)? as u64,
})
}
pub(crate) fn pending_turn_input_from_row(
row: PendingTurnInputRow,
) -> Result<lash_core::PendingTurnInput, StoreError> {
Ok(lash_core::PendingTurnInput {
input_id: row.input_id,
session_id: row.session_id,
enqueue_seq: row.enqueue_seq,
source_key: row.source_key,
ingress: decode_turn_input_ingress(row.ingress_json)?,
state: decode_turn_input_state(row.state)?,
enqueued_at_ms: row.enqueued_at_ms,
input: decode_turn_input(row.input_json)?,
})
}
pub(crate) fn load_pending_turn_input_by_id_conn(
conn: &Connection,
session_id: &str,
input_id: &str,
) -> Result<Option<lash_core::PendingTurnInput>, StoreError> {
let row = conn
.query_row(
"SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
claim_owner_id, claim_owner_incarnation_id,
claim_owner_liveness_json, claim_token, claim_expires_at_ms
FROM pending_turn_inputs
WHERE session_id = ?1 AND input_id = ?2",
params![session_id, input_id],
pending_turn_input_row_from_sql,
)
.optional()
.map_err(sqlite_error)?;
row.map(pending_turn_input_from_row).transpose()
}
pub(crate) fn load_pending_turn_input_row_by_target_conn(
conn: &Connection,
session_id: &str,
target: &lash_core::PendingTurnInputCancelTarget,
) -> Result<Option<PendingTurnInputRow>, StoreError> {
match target {
lash_core::PendingTurnInputCancelTarget::InputId(input_id) => conn
.query_row(
"SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
claim_owner_id, claim_owner_incarnation_id,
claim_owner_liveness_json, claim_token, claim_expires_at_ms
FROM pending_turn_inputs
WHERE session_id = ?1 AND input_id = ?2",
params![session_id, input_id],
pending_turn_input_row_from_sql,
)
.optional()
.map_err(sqlite_error),
lash_core::PendingTurnInputCancelTarget::SourceKey(source_key) => conn
.query_row(
"SELECT enqueue_seq, input_id, session_id, source_key, ingress_json,
state, input_json, enqueued_at_ms, claim_id, claim_fencing_token,
claim_owner_id, claim_owner_incarnation_id,
claim_owner_liveness_json, claim_token, claim_expires_at_ms
FROM pending_turn_inputs
WHERE session_id = ?1 AND source_key = ?2",
params![session_id, source_key],
pending_turn_input_row_from_sql,
)
.optional()
.map_err(sqlite_error),
}
}
pub(crate) fn pending_turn_input_claim_diagnostics_from_row(
row: &PendingTurnInputRow,
state: lash_core::TurnInputState,
) -> Option<lash_core::PendingTurnInputClaimDiagnostics> {
(row.claim_token.is_some() || matches!(state, lash_core::TurnInputState::Accepted)).then(|| {
lash_core::PendingTurnInputClaimDiagnostics {
state,
claim_id: row.claim_id.clone(),
claim_owner: row.claim_owner.clone(),
claim_expires_at_ms: row.claim_token.as_ref().map(|_| row.claim_expires_at_ms),
claim_fencing_token: row.claim_fencing_token,
}
})
}
#[derive(Clone, Debug)]
pub(crate) struct TurnInputClaimLease {
pub(crate) claim_id: String,
pub(crate) lease_token: String,
pub(crate) fencing_token: u64,
pub(crate) claimed_at_epoch_ms: u64,
pub(crate) expires_at_epoch_ms: u64,
}
impl TurnInputClaimLease {
pub(crate) fn derive(
head: &PendingTurnInputRow,
session_id: &str,
owner: &LeaseOwnerIdentity,
now_epoch_ms: u64,
lease_ttl_ms: u64,
) -> Self {
let fencing_token = head.claim_fencing_token.saturating_add(1);
let claim_id = format!("tic:{}:{fencing_token}", head.enqueue_seq);
let lease_token = format!(
"{:x}",
Sha256::digest(
format!(
"{}:{}:{}:{}:{}",
session_id, owner.owner_id, owner.incarnation_id, claim_id, now_epoch_ms
)
.as_bytes(),
)
);
Self {
claim_id,
lease_token,
fencing_token,
claimed_at_epoch_ms: now_epoch_ms,
expires_at_epoch_ms: now_epoch_ms.saturating_add(lease_ttl_ms),
}
}
}
pub(crate) fn ensure_turn_input_completion_owns_all_inputs(
completed: &lash_core::TurnInputCompletion,
owned_rows: usize,
) -> Result<(), StoreError> {
if owned_rows != completed.input_ids.len() {
return Err(StoreError::TurnInputClaimExpired {
session_id: completed.session_id.clone(),
claim_id: completed.claim_id.clone(),
});
}
Ok(())
}