use crate::{Event, Message, scheduler::Runtime, utils};
use std::sync::Arc;
use tracing::{debug, error, info};
#[derive(Debug, Clone)]
pub struct ChannelOptions {
pub id: String,
pub ack: bool,
pub r#type: String,
pub state: String,
pub tag: String,
pub key: String,
pub uses: String,
}
impl Default for ChannelOptions {
fn default() -> Self {
Self {
id: utils::shortid(),
ack: false,
r#type: "*".to_string(),
state: "*".to_string(),
tag: "*".to_string(),
key: "*".to_string(),
uses: "*".to_string(),
}
}
}
impl ChannelOptions {
pub fn pattern(&self) -> String {
format!("{}:{}:{}:{}", self.r#type, self.state, self.tag, self.key)
}
}
pub struct Channel {
runtime: Arc<Runtime>,
ack: bool,
chan_id: String,
pattern: String,
glob: (
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
),
}
impl Channel {
pub fn new(rt: &Arc<Runtime>) -> Self {
Self::channel(rt, &ChannelOptions::default())
}
#[allow(clippy::self_named_constructors)]
pub fn channel(rt: &Arc<Runtime>, options: &ChannelOptions) -> Self {
debug!("channel: {options:?}");
let pat_type = globset::Glob::new(&options.r#type)
.unwrap()
.compile_matcher();
let pat_state = globset::Glob::new(&options.state)
.unwrap()
.compile_matcher();
let pat_tag = globset::Glob::new(&options.tag).unwrap().compile_matcher();
let pat_key = globset::Glob::new(&options.key).unwrap().compile_matcher();
let pat_uses = globset::Glob::new(&options.uses).unwrap().compile_matcher();
Self {
runtime: rt.clone(),
ack: options.ack,
chan_id: options.id.clone(),
pattern: options.pattern(),
glob: (pat_type, pat_state, pat_tag, pat_key, pat_uses),
}
}
pub fn on_message(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
let glob = self.glob.clone();
let runtime = self.runtime.clone();
let ack = self.ack;
let chan_id = self.chan_id.clone();
let pattern = self.pattern.clone();
self.runtime.emitter().on_message(&self.chan_id, move |e| {
info!("on_message: chan={} {e:?}", chan_id);
if is_match(&glob, e) {
store_if(&runtime, ack, &chan_id, &pattern, e);
f(e);
}
});
}
pub fn on_start(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
let glob = self.glob.clone();
let runtime = self.runtime.clone();
let ack = self.ack;
let chan_id = self.chan_id.clone();
let pattern = self.pattern.clone();
self.runtime.emitter().on_start(&self.chan_id, move |e| {
if is_match(&glob, e) {
store_if(&runtime, ack, &chan_id, &pattern, e);
f(e);
}
});
}
pub fn on_complete(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
let glob = self.glob.clone();
let runtime = self.runtime.clone();
let ack = self.ack;
let chan_id = self.chan_id.clone();
let pattern = self.pattern.clone();
self.runtime.emitter().on_complete(&self.chan_id, move |e| {
if is_match(&glob, e) {
store_if(&runtime, ack, &chan_id, &pattern, e);
f(e);
}
});
}
pub fn on_error(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
let glob = self.glob.clone();
let runtime = self.runtime.clone();
let ack = self.ack;
let chan_id = self.chan_id.clone();
let pattern = self.pattern.clone();
self.runtime.emitter().on_error(&self.chan_id, move |e| {
if is_match(&glob, e) {
store_if(&runtime, ack, &chan_id, &pattern, e);
f(e);
}
});
}
pub fn close(&self) {
self.runtime.emitter().remove(&self.chan_id);
}
}
fn store_if(runtime: &Arc<Runtime>, ack: bool, chan_id: &str, pattern: &str, message: &Message) {
if ack && !chan_id.is_empty() && message.retry_times == 0 {
info!("store: {message:?}");
let msg = message.into(chan_id, pattern);
runtime
.cache()
.store()
.messages()
.create(&msg)
.unwrap_or_else(|err| {
error!("channel.store_if_emit_id: {}", err.to_string());
eprintln!("channel.store_if_emit_id: {}", err);
false
});
}
}
fn is_match(
glob: &(
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
globset::GlobMatcher,
),
e: &Event<Message>,
) -> bool {
let (pat_type, pat_state, pat_tag, pat_key, pat_uses) = glob;
pat_type.is_match(&e.r#type)
&& pat_state.is_match(e.state.as_ref())
&& (pat_tag.is_match(&e.tag) || pat_tag.is_match(&e.model.tag))
&& pat_key.is_match(&e.key)
&& pat_uses.is_match(&e.uses)
}