mod blocking;
mod concurrency;
mod query;
mod recovery;
mod status;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use rustvello_core::error::{RustvelloError, RustvelloResult};
use rustvello_proto::status::InvocationStatusRecord;
use crate::connection::RedisPool;
fn prefixed_key(prefix: &str, suffix: &str) -> String {
let mut s = String::with_capacity(prefix.len() + suffix.len());
s.push_str(prefix);
s.push_str(suffix);
s
}
fn cc_pair_redis_key(cc_prefix: &str, task_id: &str, arg_key: &str, arg_value: &str) -> String {
format!("{cc_prefix}{task_id}\x1f{arg_key}\x1f{arg_value}")
}
#[non_exhaustive]
pub struct RedisOrchestrator {
pub(crate) pool: Arc<RedisPool>,
pub(crate) status_prefix: String,
pub(crate) task_inv_prefix: String,
pub(crate) call_inv_prefix: String,
pub(crate) waiters_prefix: String,
pub(crate) cc_prefix: String,
pub(crate) cc_rev_prefix: String,
pub(crate) heartbeat_prefix: String,
pub(crate) retries_prefix: String,
}
impl RedisOrchestrator {
pub fn new(pool: Arc<RedisPool>) -> Self {
let p = pool.prefix();
Self {
status_prefix: format!("{p}orch:status:"),
task_inv_prefix: format!("{p}orch:task_inv:"),
call_inv_prefix: format!("{p}orch:call_inv:"),
waiters_prefix: format!("{p}orch:waiters:"),
cc_prefix: format!("{p}orch:cc:"),
cc_rev_prefix: format!("{p}orch:cc_rev:"),
heartbeat_prefix: format!("{p}orch:heartbeat:"),
retries_prefix: format!("{p}orch:retries:"),
pool,
}
}
fn status_key(&self, inv_id: &rustvello_proto::identifiers::InvocationId) -> String {
prefixed_key(&self.status_prefix, inv_id.as_str())
}
}
fn serialize_status_record(record: &InvocationStatusRecord) -> RustvelloResult<String> {
serde_json::to_string(record).map_err(|e| RustvelloError::Serialization {
message: format!("status record: {}", e),
})
}
fn deserialize_status_record(s: &str) -> RustvelloResult<InvocationStatusRecord> {
serde_json::from_str(s).map_err(|e| RustvelloError::Serialization {
message: format!("status record: {}", e),
})
}