harn-vm 0.8.6

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::BTreeMap;
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use time::OffsetDateTime;

use crate::connectors::ConnectorError;
use crate::event_log::{AnyEventLog, EventLog, LogEvent, Topic};

pub(crate) const CRON_STATE_TOPIC: &str = "connectors.cron.state";
const STATE_EVENT_KIND: &str = "cron_trigger_state";

#[derive(Clone)]
pub(crate) struct CronStateStore {
    event_log: Arc<AnyEventLog>,
    topic: Topic,
}

impl CronStateStore {
    pub(crate) fn new(event_log: Arc<AnyEventLog>) -> Self {
        Self {
            event_log,
            topic: Topic::new(CRON_STATE_TOPIC).expect("cron state topic is valid"),
        }
    }

    pub(crate) async fn load_all(
        &self,
    ) -> Result<BTreeMap<String, PersistedCronState>, ConnectorError> {
        let mut state = BTreeMap::new();
        let events = self
            .event_log
            .read_range(&self.topic, None, usize::MAX)
            .await
            .map_err(ConnectorError::from)?;
        for (_, record) in events {
            if record.kind != STATE_EVENT_KIND {
                continue;
            }
            let payload: PersistedCronState =
                serde_json::from_value(record.payload).map_err(ConnectorError::from)?;
            state.insert(payload.trigger_id.clone(), payload);
        }
        Ok(state)
    }

    pub(crate) async fn load(
        &self,
        trigger_id: &str,
    ) -> Result<Option<PersistedCronState>, ConnectorError> {
        Ok(self.load_all().await?.remove(trigger_id))
    }

    pub(crate) async fn persist(&self, state: PersistedCronState) -> Result<(), ConnectorError> {
        let payload = serde_json::to_value(&state).map_err(ConnectorError::from)?;
        self.event_log
            .append(&self.topic, LogEvent::new(STATE_EVENT_KIND, payload))
            .await
            .map_err(ConnectorError::from)?;
        Ok(())
    }
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct PersistedCronState {
    pub trigger_id: String,
    #[serde(with = "time::serde::rfc3339")]
    pub last_fired_at: OffsetDateTime,
}