hermesmq-core 0.2.0

Core engine for hermesmq: queue state machine and Raft application types
Documentation
mod log_store;
mod network;
mod state_machine;

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

use openraft::{BasicNode, Config, Raft, StorageError, StorageIOError};
use serde::de::DeserializeOwned;

pub use log_store::LogStore;
pub use network::{serve_peer, PartitionControl, PeerConnection, PeerNetwork};
pub use state_machine::{SnapshotBuilder, StateMachineStore};

use crate::raft::TypeConfig;
use crate::storage::Storage;
use crate::types::NodeId;
use crate::RedbStore;

pub type HermesRaft = Raft<TypeConfig>;

fn ioerr<E: std::fmt::Display>(e: E) -> std::io::Error {
    std::io::Error::other(e.to_string())
}

pub(crate) fn enc<T: serde::Serialize>(value: &T) -> Result<Vec<u8>, StorageError<NodeId>> {
    postcard::to_stdvec(value).map_err(|e| StorageIOError::write_logs(&ioerr(e)).into())
}

pub(crate) fn dec<T: DeserializeOwned>(bytes: &[u8]) -> Result<T, StorageError<NodeId>> {
    postcard::from_bytes(bytes).map_err(|e| StorageIOError::read_logs(&ioerr(e)).into())
}

pub(crate) fn sread<E: std::fmt::Display>(e: E) -> StorageError<NodeId> {
    StorageIOError::read_logs(&ioerr(e)).into()
}

pub(crate) fn swrite<E: std::fmt::Display>(e: E) -> StorageError<NodeId> {
    StorageIOError::write_logs(&ioerr(e)).into()
}

pub async fn build_raft_partitionable<S: Storage>(
    node_id: NodeId,
    db: Arc<S>,
) -> Result<(HermesRaft, StateMachineStore<S>, PartitionControl), Box<dyn std::error::Error + Send + Sync>>
{
    let config = Arc::new(Config {
        heartbeat_interval: 100,
        election_timeout_min: 500,
        election_timeout_max: 1500,
        max_payload_entries: 32,
        ..Config::default()
    });
    let log = LogStore::new(db.clone());
    let state_machine = StateMachineStore::new(db)?;
    let sm_read = state_machine.clone();
    let network = PeerNetwork::default();
    let blocked = network.blocked_handle();
    let raft = Raft::new(node_id, config, network, log, state_machine).await?;
    Ok((raft, sm_read, blocked))
}

pub async fn build_raft<S: Storage>(
    node_id: NodeId,
    db: Arc<S>,
) -> Result<(HermesRaft, StateMachineStore<S>), Box<dyn std::error::Error + Send + Sync>> {
    let (raft, sm, _blocked) = build_raft_partitionable(node_id, db).await?;
    Ok((raft, sm))
}

pub async fn initialize_cluster(
    raft: &HermesRaft,
    members: &[(NodeId, String)],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let map: BTreeMap<NodeId, BasicNode> = members
        .iter()
        .map(|(id, addr)| (*id, BasicNode::new(addr)))
        .collect();
    raft.initialize(map).await?;
    Ok(())
}

pub async fn add_learner(
    raft: &HermesRaft,
    id: NodeId,
    addr: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    raft.add_learner(id, BasicNode::new(addr), true).await?;
    Ok(())
}

pub async fn set_voters(
    raft: &HermesRaft,
    voters: &[NodeId],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let set: BTreeSet<NodeId> = voters.iter().copied().collect();
    raft.change_membership(set, false).await?;
    Ok(())
}

pub async fn start_single_node(
    node_id: NodeId,
    db: Arc<RedbStore>,
) -> Result<HermesRaft, Box<dyn std::error::Error + Send + Sync>> {
    let (raft, _sm) = build_raft(node_id, db).await?;
    initialize_cluster(&raft, &[(node_id, String::new())]).await?;
    Ok(raft)
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::*;
    use crate::{AppRequest, AppResponse, ContentType, GroupId, Priority, TopicId};

    #[tokio::test]
    async fn single_node_bootstrap_and_write() {
        let db = Arc::new(RedbStore::in_memory().unwrap());
        let raft = start_single_node(1, db).await.unwrap();

        raft.wait(Some(Duration::from_secs(10)))
            .current_leader(1, "elect self as leader")
            .await
            .unwrap();

        let created = raft
            .client_write(AppRequest::CreateTopic {
                topic: TopicId::from("orders"),
            })
            .await
            .unwrap();
        assert!(matches!(created.data, AppResponse::TopicCreated));

        let first = raft
            .client_write(AppRequest::Produce {
                topic: TopicId::from("orders"),
                priority: Priority::default(),
                content_type: ContentType::Raw,
                payload: b"hello".to_vec(),
                producer_id: "p1".to_string(),
                seq: 1,
                ts_ms: 0,
            })
            .await
            .unwrap();
        assert!(matches!(first.data, AppResponse::Produced { offset: 0 }));

        let second = raft
            .client_write(AppRequest::Produce {
                topic: TopicId::from("orders"),
                priority: Priority::default(),
                content_type: ContentType::Raw,
                payload: b"world".to_vec(),
                producer_id: "p1".to_string(),
                seq: 2,
                ts_ms: 0,
            })
            .await
            .unwrap();
        assert!(matches!(second.data, AppResponse::Produced { offset: 1 }));

        let polled = raft
            .client_write(AppRequest::Poll {
                topic: TopicId::from("orders"),
                group: GroupId::from("workers"),
                max: 10,
                visibility_timeout_ms: 1000,
                ts_ms: 0,
            })
            .await
            .unwrap();
        let leases: Vec<_> = match polled.data {
            AppResponse::Polled { items } => items,
            other => panic!("expected Polled, got {other:?}"),
        };
        assert_eq!(leases.len(), 2);

        raft.client_write(AppRequest::Ack {
            topic: TopicId::from("orders"),
            group: GroupId::from("workers"),
            lease_id: leases[0].lease_id,
        })
        .await
        .unwrap();

        let after = raft
            .client_write(AppRequest::Poll {
                topic: TopicId::from("orders"),
                group: GroupId::from("workers"),
                max: 10,
                visibility_timeout_ms: 1000,
                ts_ms: 5000,
            })
            .await
            .unwrap();
        match after.data {
            AppResponse::Polled { items } => {
                assert_eq!(items.len(), 1);
                assert_eq!(items[0].offset, leases[1].offset);
            }
            other => panic!("expected Polled, got {other:?}"),
        }

        raft.shutdown().await.unwrap();
    }
}