rustvello-redis 0.1.0

Redis backend implementations for Rustvello
Documentation
use async_trait::async_trait;
use redis::AsyncCommands;

use rustvello_core::error::RustvelloResult;
use rustvello_core::orchestrator::OrchestratorBlocking;
use rustvello_proto::identifiers::InvocationId;

use super::{prefixed_key, RedisOrchestrator};
use crate::connection::redis_err;

#[async_trait]
impl OrchestratorBlocking for RedisOrchestrator {
    async fn set_waiting_for(
        &self,
        waiter: &InvocationId,
        waited_on: &InvocationId,
    ) -> RustvelloResult<()> {
        let mut conn = self.pool.conn().await?;
        conn.sadd::<_, _, ()>(
            prefixed_key(&self.waiters_prefix, waited_on.as_str()),
            waiter.as_str(),
        )
        .await
        .map_err(redis_err)
    }

    async fn get_waiters(&self, waited_on: &InvocationId) -> RustvelloResult<Vec<InvocationId>> {
        let mut conn = self.pool.conn().await?;
        let members: Vec<String> = conn
            .smembers(prefixed_key(&self.waiters_prefix, waited_on.as_str()))
            .await
            .map_err(redis_err)?;
        Ok(members.into_iter().map(InvocationId::from_string).collect())
    }

    async fn release_waiters(
        &self,
        completed: &InvocationId,
    ) -> RustvelloResult<Vec<InvocationId>> {
        let waiters = self.get_waiters(completed).await?;
        if !waiters.is_empty() {
            let mut conn = self.pool.conn().await?;
            conn.del::<_, ()>(prefixed_key(&self.waiters_prefix, completed.as_str()))
                .await
                .map_err(redis_err)?;
        }
        Ok(waiters)
    }
}