use std::sync::{Arc, RwLock};
use log::*;
use chrono::{DateTime, Utc};
use crate::message::stomp_message::{StompMessage, Header};
use crate::session::filter::Filter;
use crate::session::stomp_session::StompSession;
use crate::util::rollover_gt;
use crate::workflow::destination::destination::Destination;
use std::ops::Deref;
#[derive(Debug, PartialEq)]
pub enum Ack {
Auto,
Client,
ClientIndividual,
}
pub fn parse_ack_hdr(hdr: &String) -> Ack {
match hdr.as_str() {
"client" => return Ack::Client,
"client-individual" => return Ack::ClientIndividual,
_ => Ack::Auto,
}
}
pub struct Subscription {
session: Arc<RwLock<StompSession>>,
destination: Arc<RwLock<Destination>>,
filter: Option<Filter>,
filter_self: bool,
id: u64, hash_id: String,
last_msg: usize, timestamp: DateTime<Utc>, ack: Ack, }
impl Subscription {
pub fn new(session: Arc<RwLock<StompSession>>, destination: Arc<RwLock<Destination>>, id: u64, ack: Ack, filter: Option<Filter>, filter_self: bool) -> Subscription {
let session_id = session.read().unwrap().id();
debug!("new subscription id={}, ack_mode={:?}, filter_self={}", id, ack, filter_self);
Subscription {
session,
destination,
filter,
filter_self,
id,
hash_id: format!("{}-{}", session_id, id),
last_msg: 0,
timestamp: Utc::now(),
ack,
}
}
pub fn hash_id(&self) -> &String {
&self.hash_id
}
pub fn user_id(&self) -> String {
if let Some(user) = &self.session.read().unwrap().user {
return user.clone();
} else {
return String::from("");
}
}
pub fn subscription_id(&self) -> u64 {
self.id
}
pub(crate) fn unsubscribe(&self) -> bool {
self.destination.write().unwrap().unsubscribe(&self.hash_id)
}
pub(crate) fn publish(&mut self, dest_id: usize, message: Arc<StompMessage>, do_filter: bool) -> bool {
let msg_id;
{
msg_id = message.id;
if do_filter {
if let Some(filter) = &self.filter {
if !filter.matches_message(message.deref()) {
debug!("filtered out {}", msg_id);
return false;
}
}
}
}
if self.last_msg == 0 || rollover_gt(msg_id, self.last_msg) {
let sub_hdr = Header::from_string("subscription", self.id.to_string());
let ack_hdr = Header::from_string("ack", msg_id.to_string());
let session = self.session.read().unwrap();
if self.filter_self {
if let Some(session_id) = message.session_id {
if session_id == session.id() {
debug!("not sending to self {}", msg_id);
return false;
}
}
}
session.send_message_w_hdrs(message.clone(), vec![sub_hdr, ack_hdr]);
self.last_msg = msg_id;
debug!("published message id {}", msg_id);
match self.ack {
Ack::Auto => {
return true;
},
_ => {
session.pending_ack(msg_id, dest_id);
return false;
},
}
} else {
debug!("already published {}", msg_id);
return false;
}
}
pub fn as_string(&self) -> String{
let mut debug = String::new();
debug.push_str("sub");
let session = self.session.read().unwrap();
if let Some(user) = &session.user {
debug.push_str(" user=\"");
debug.push_str(user.as_str());
debug.push_str("\"");
}
debug.push_str(" created=");
debug.push_str(&self.timestamp.to_rfc3339());
debug
}
}
impl Drop for Subscription {
fn drop(&mut self) {
debug!("dropped");
}
}