asteroid-mq 0.1.0-alpha.3

An embeddable message queue system
Documentation
use std::{collections::HashMap, sync::Arc};

use serde::{Deserialize, Serialize};
use tracing::instrument;

use crate::{
    prelude::{Topic, TopicCode},
    protocol::{
        node::raft::proposal::{
            DelegateMessage, EndpointInterest, EndpointOffline, EndpointOnline, LoadTopic,
            ProposalContext, SetState, UnloadTopic,
        },
        topic::TopicInner,
    },
};

use super::topic::TopicData;

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NodeData {
    pub(crate) topics: HashMap<TopicCode, TopicData>,
}

impl NodeData {
    #[instrument(skip_all, fields(node_id=%ctx.node.id(), topic=%topic, message_id = %message.id()))]
    pub(crate) fn apply_delegate_message(
        &mut self,
        DelegateMessage { topic, message }: DelegateMessage,
        mut ctx: ProposalContext,
    ) {
        ctx.set_topic_code(topic.clone());
        if let Some(topic) = self.topics.get_mut(&topic) {
            topic.hold_new_message(message.clone(), &mut ctx);
        } else {
            tracing::error!(?topic, "topic not found");
        }
        ctx.commit_durable_commands();
    }
    pub(crate) fn apply_load_topic(
        &mut self,
        LoadTopic { config, mut queue }: LoadTopic,
        mut ctx: ProposalContext,
    ) {
        use std::collections::hash_map::Entry;
        let code = config.code.clone();
        let entry = self.topics.entry(code.clone());
        match entry {
            Entry::Vacant(entry) => {
                queue.sort_by_key(|m| m.time);
                ctx.set_topic_code(code.clone());
                let topic = TopicData::from_durable(config, queue);
                entry.insert(topic);
            }
            _ => {
                tracing::warn!(?code, "topic already loaded");
                return;
            }
        }
        let node = ctx.node.clone();
        let topic = Topic {
            inner: Arc::new(TopicInner {
                code: code.clone(),
                node: node.clone(),
                ack_waiting_pool: Default::default(),
                local_endpoints: Default::default(),
            }),
        };
        node.topics.write().unwrap().insert(code.clone(), topic);
        ctx.commit_durable_commands();
    }
    #[instrument(skip_all, fields(node_id=%ctx.node.id(), topic=%topic, message_id=%update.message_id))]
    pub(crate) fn apply_set_state(
        &mut self,
        SetState { topic, update }: SetState,
        mut ctx: ProposalContext,
    ) {
        ctx.set_topic_code(topic.clone());
        if let Some(topic) = self.topics.get_mut(&topic) {
            topic.update_and_flush(update.clone(), &mut ctx);
        } else {
            tracing::error!(?topic, "topic not found");
        }
        ctx.commit_durable_commands();
    }
    pub(crate) fn apply_unload_topic(&mut self, UnloadTopic { code }: UnloadTopic) {
        self.topics.remove(&code);
    }
    pub(crate) fn apply_ep_online(
        &mut self,
        EndpointOnline {
            topic_code,
            endpoint,
            interests,
            host,
        }: EndpointOnline,
        mut ctx: ProposalContext,
    ) {
        let Some(topic) = self.topics.get_mut(&topic_code) else {
            return;
        };
        ctx.set_topic_code(topic_code);
        topic.ep_online(endpoint, interests, host, &mut ctx);
        ctx.commit_durable_commands();
    }
    pub(crate) fn apply_ep_offline(
        &mut self,
        EndpointOffline {
            topic_code,
            endpoint,
            host,
        }: EndpointOffline,
        mut ctx: ProposalContext,
    ) {
        let Some(topic) = self.topics.get_mut(&topic_code) else {
            return;
        };
        ctx.set_topic_code(topic_code);
        topic.ep_offline(host, &endpoint, &mut ctx);
        ctx.commit_durable_commands();
    }
    pub(crate) fn apply_ep_interest(
        &mut self,
        EndpointInterest {
            topic_code,
            endpoint,
            interests,
        }: EndpointInterest,
        mut ctx: ProposalContext,
    ) {
        let Some(topic) = self.topics.get_mut(&topic_code) else {
            return;
        };
        ctx.set_topic_code(topic_code);
        topic.update_ep_interest(&endpoint, interests, &mut ctx);
        ctx.commit_durable_commands();
    }
}