romp 0.5.2

STOMP server and WebSockets platform
Documentation

use std::sync::{Arc, RwLock};

use log::*;

use chrono::{DateTime, Utc};

use crate::message::stomp_message::{StompMessage, Header};
use crate::session::filter::Filter;
use crate::session::stomp_session::StompSession;
use crate::util::rollover_gt;
use crate::workflow::destination::destination::Destination;
use std::ops::Deref;

#[derive(Debug, PartialEq)]
pub enum Ack {
    Auto,
    Client,
    ClientIndividual,
}

pub fn parse_ack_hdr(hdr: &String) -> Ack {
    match hdr.as_str() {
        "client" => return Ack::Client,
        "client-individual" => return Ack::ClientIndividual,
        _ => Ack::Auto,
    }
}

/// Subscription to a destination, these are stored on the Destination & the session
/// and hold references to the session, so messages can be queued
// TODO could be weak references so we don't have to manually unhook on shutdown

pub struct Subscription {
    session: Arc<RwLock<StompSession>>,
    destination: Arc<RwLock<Destination>>,
    filter: Option<Filter>,
    filter_self: bool,
    id: u64,         // subscription id
    hash_id: String,
    last_msg: usize, // last delivered message
    timestamp: DateTime<Utc>,  // start time
    ack: Ack,        // auto|client|client-individual
}

impl Subscription {
    pub fn new(session: Arc<RwLock<StompSession>>, destination: Arc<RwLock<Destination>>, id: u64, ack: Ack, filter: Option<Filter>, filter_self: bool) -> Subscription {
        let session_id = session.read().unwrap().id();
        debug!("new subscription id={}, ack_mode={:?}, filter_self={}", id, ack, filter_self);
        Subscription {
            session,
            destination,
            filter,
            filter_self,
            id,
            hash_id: format!("{}-{}", session_id, id),
            last_msg: 0,
            timestamp: Utc::now(),
            ack,
        }
    }

    pub fn hash_id(&self) -> &String {
        &self.hash_id
    }

    pub fn user_id(&self) -> String {
        if let Some(user) = &self.session.read().unwrap().user {
            return user.clone();
        } else {
            return String::from("");
        }
    }

    pub fn subscription_id(&self) -> u64 {
        self.id
    }

    pub(crate) fn unsubscribe(&self) -> bool {
        self.destination.write().unwrap().unsubscribe(&self.hash_id)
    }

    /// send a published message to the client, checks first to see if it was already published
    pub(crate) fn publish(&mut self, dest_id: usize, message: Arc<StompMessage>, do_filter: bool) -> bool {
        let msg_id;
        {
            msg_id = message.id;
            if do_filter {
                if let Some(filter) = &self.filter {
                    if !filter.matches_message(message.deref()) {
                        debug!("filtered out {}", msg_id);
                        return false;
                    }
                }
            }
        }

        if self.last_msg == 0 || rollover_gt(msg_id, self.last_msg) {
            let sub_hdr = Header::from_string("subscription", self.id.to_string());
            let ack_hdr = Header::from_string("ack", msg_id.to_string());
            let session = self.session.read().unwrap();
            if self.filter_self {
                if let Some(session_id) = message.session_id {
                    if session_id == session.id() {
                        debug!("not sending to self {}", msg_id);
                        return false;
                    }
                }
            }
            session.send_message_w_hdrs(message.clone(), vec![sub_hdr, ack_hdr]);
            self.last_msg = msg_id;
            debug!("published message id {}", msg_id);
            match self.ack {
                Ack::Auto => {
                    return true;
                },
                _ => {
                    session.pending_ack(msg_id, dest_id);
                    return false;
                },
            }
        } else {
            debug!("already published {}", msg_id);
            return false;
        }

    }

    pub fn as_string(&self) -> String{
        let mut debug = String::new();
        debug.push_str("sub");

        let session = self.session.read().unwrap();
        if let Some(user) = &session.user {
            debug.push_str(" user=\"");
            debug.push_str(user.as_str());
            debug.push_str("\"");
        }
        debug.push_str(" created=");
        debug.push_str(&self.timestamp.to_rfc3339());

        debug
    }
}

impl Drop for Subscription {
    fn drop(&mut self) {
        debug!("dropped");
    }
}