use std::future::Future;
use std::pin::Pin;
use redis::Client;
use redis::aio::ConnectionManager;
use crate::error::{AdaptiveError, Result};
use crate::storage::traits::{StorageBackend, StorageBackendDyn};
use crate::trie::accumulator::AccumulatorState;
use crate::trie::serialization::TrieEnvelope;
use crate::types::plan::ExecutionPlan;
use crate::types::records::RunRecord;
pub struct RedisBackend {
client: Client,
conn: ConnectionManager,
key_prefix: String,
}
impl RedisBackend {
pub async fn new(url: &str, key_prefix: impl Into<String>) -> Result<Self> {
let client = redis::Client::open(url)
.map_err(|e| AdaptiveError::Storage(format!("redis client: {e}")))?;
let conn = client
.get_connection_manager()
.await
.map_err(|e| AdaptiveError::Storage(format!("redis connection: {e}")))?;
Ok(Self {
client,
conn,
key_prefix: key_prefix.into(),
})
}
fn key(&self, kind: &str, agent_id: &str) -> String {
format!("{}{}:{}", self.key_prefix, kind, agent_id)
}
fn run_key(&self, agent_id: &str, run_id: &uuid::Uuid) -> String {
format!("{}runs:{}:{}", self.key_prefix, agent_id, run_id)
}
async fn store_run_impl(&self, record: &RunRecord) -> Result<()> {
let mut conn = self.conn.clone();
let run_key = self.run_key(&record.agent_id, &record.id);
let index_key = self.key("runs_index", &record.agent_id);
let json = serde_json::to_string(record).map_err(AdaptiveError::Serialization)?;
let run_id_str = record.id.to_string();
redis::pipe()
.atomic()
.cmd("SET")
.arg(&run_key)
.arg(&json)
.cmd("RPUSH")
.arg(&index_key)
.arg(&run_id_str)
.exec_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis store_run pipeline: {e}")))?;
Ok(())
}
async fn load_plan_impl(&self, agent_id: &str) -> Result<Option<ExecutionPlan>> {
let mut conn = self.conn.clone();
let key = self.key("plan", agent_id);
let maybe_json: Option<String> =
redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET plan: {e}")))?;
match maybe_json {
Some(json) => {
let plan = serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
Ok(Some(plan))
}
None => Ok(None),
}
}
async fn list_runs_impl(&self, agent_id: &str) -> Result<Vec<RunRecord>> {
let mut conn = self.conn.clone();
let index_key = self.key("runs_index", agent_id);
let prefix = self.key_prefix.clone();
let agent_id_owned = agent_id.to_string();
let ids: Vec<String> = redis::cmd("LRANGE")
.arg(&index_key)
.arg(0i64)
.arg(-1i64)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis LRANGE runs: {e}")))?;
let mut records = Vec::with_capacity(ids.len());
for id_str in &ids {
let key = format!("{}runs:{}:{}", prefix, agent_id_owned, id_str);
let maybe_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET run: {e}")))?;
if let Some(json) = maybe_json {
let record: RunRecord =
serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
records.push(record);
}
}
Ok(records)
}
async fn store_trie_impl(&self, agent_id: &str, envelope: &TrieEnvelope) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.key("trie", agent_id);
let json = serde_json::to_string(envelope).map_err(AdaptiveError::Serialization)?;
redis::cmd("SET")
.arg(&key)
.arg(&json)
.exec_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis SET trie: {e}")))?;
Ok(())
}
async fn load_trie_impl(&self, agent_id: &str) -> Result<Option<TrieEnvelope>> {
let mut conn = self.conn.clone();
let key = self.key("trie", agent_id);
let maybe_json: Option<String> =
redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET trie: {e}")))?;
match maybe_json {
Some(json) => {
let envelope = serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
Ok(Some(envelope))
}
None => Ok(None),
}
}
async fn store_accumulators_impl(
&self,
agent_id: &str,
state: &AccumulatorState,
) -> Result<()> {
let mut conn = self.conn.clone();
let key = self.key("accumulators", agent_id);
let json = serde_json::to_string(state).map_err(AdaptiveError::Serialization)?;
redis::cmd("SET")
.arg(&key)
.arg(&json)
.exec_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis SET accumulators: {e}")))?;
Ok(())
}
async fn load_accumulators_impl(&self, agent_id: &str) -> Result<Option<AccumulatorState>> {
let mut conn = self.conn.clone();
let key = self.key("accumulators", agent_id);
let maybe_json: Option<String> =
redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET accumulators: {e}")))?;
match maybe_json {
Some(json) => {
let state = serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
Ok(Some(state))
}
None => Ok(None),
}
}
}
impl StorageBackend for RedisBackend {
fn store_run(&self, record: &RunRecord) -> impl Future<Output = Result<()>> + Send {
self.store_run_impl(record)
}
fn load_plan(
&self,
agent_id: &str,
) -> impl Future<Output = Result<Option<ExecutionPlan>>> + Send {
self.load_plan_impl(agent_id)
}
fn list_runs(&self, agent_id: &str) -> impl Future<Output = Result<Vec<RunRecord>>> + Send {
self.list_runs_impl(agent_id)
}
}
impl StorageBackendDyn for RedisBackend {
fn store_run_dyn<'a>(
&'a self,
record: &'a RunRecord,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.store_run_impl(record))
}
fn load_plan_dyn<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionPlan>>> + Send + 'a>> {
Box::pin(self.load_plan_impl(agent_id))
}
fn list_runs_dyn<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<RunRecord>>> + Send + 'a>> {
Box::pin(self.list_runs_impl(agent_id))
}
fn store_trie<'a>(
&'a self,
agent_id: &'a str,
envelope: &'a TrieEnvelope,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.store_trie_impl(agent_id, envelope))
}
fn load_trie<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<TrieEnvelope>>> + Send + 'a>> {
Box::pin(self.load_trie_impl(agent_id))
}
fn store_accumulators<'a>(
&'a self,
agent_id: &'a str,
state: &'a AccumulatorState,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(self.store_accumulators_impl(agent_id, state))
}
fn load_accumulators<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Option<AccumulatorState>>> + Send + 'a>> {
Box::pin(self.load_accumulators_impl(agent_id))
}
fn store_plan(&self, plan: &ExecutionPlan) -> Result<()> {
let mut conn = self
.client
.get_connection()
.map_err(|e| AdaptiveError::Storage(format!("redis connection: {e}")))?;
let key = self.key("plan", &plan.agent_id);
let json = serde_json::to_string(plan).map_err(AdaptiveError::Serialization)?;
redis::cmd("SET")
.arg(&key)
.arg(&json)
.exec(&mut conn)
.map_err(|e| AdaptiveError::Storage(format!("redis SET plan: {e}")))?;
Ok(())
}
fn store_observations<'a>(
&'a self,
agent_id: &'a str,
observations: &'a [crate::acg::prompt_ir::PromptIR],
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
let mut conn = self.conn.clone();
let key = self.key("acg_observations", agent_id);
let json = serde_json::to_string(observations);
Box::pin(async move {
let json = json.map_err(AdaptiveError::Serialization)?;
redis::cmd("SET")
.arg(&key)
.arg(&json)
.exec_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis SET acg_observations: {e}")))?;
Ok(())
})
}
fn load_observations<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<
Box<dyn Future<Output = Result<Option<Vec<crate::acg::prompt_ir::PromptIR>>>> + Send + 'a>,
> {
let mut conn = self.conn.clone();
let key = self.key("acg_observations", agent_id);
Box::pin(async move {
let maybe_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET acg_observations: {e}")))?;
match maybe_json {
Some(json) => {
let obs = serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
Ok(Some(obs))
}
None => Ok(None),
}
})
}
fn store_stability<'a>(
&'a self,
agent_id: &'a str,
result: &'a crate::acg::stability::StabilityAnalysisResult,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
let mut conn = self.conn.clone();
let key = self.key("acg_stability", agent_id);
let json = serde_json::to_string(result);
Box::pin(async move {
let json = json.map_err(AdaptiveError::Serialization)?;
redis::cmd("SET")
.arg(&key)
.arg(&json)
.exec_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis SET acg_stability: {e}")))?;
Ok(())
})
}
fn load_stability<'a>(
&'a self,
agent_id: &'a str,
) -> Pin<
Box<
dyn Future<Output = Result<Option<crate::acg::stability::StabilityAnalysisResult>>>
+ Send
+ 'a,
>,
> {
let mut conn = self.conn.clone();
let key = self.key("acg_stability", agent_id);
Box::pin(async move {
let maybe_json: Option<String> = redis::cmd("GET")
.arg(&key)
.query_async(&mut conn)
.await
.map_err(|e| AdaptiveError::Storage(format!("redis GET acg_stability: {e}")))?;
match maybe_json {
Some(json) => {
let result =
serde_json::from_str(&json).map_err(AdaptiveError::Serialization)?;
Ok(Some(result))
}
None => Ok(None),
}
})
}
}
#[cfg(test)]
#[path = "../tests/unit/redis_tests.rs"]
mod tests;