1use crate::{scheduler::Runtime, utils, Event, Message};
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5#[derive(Debug, Clone)]
6pub struct ChannelOptions {
7 pub id: String,
8
9 pub ack: bool,
11
12 pub r#type: String,
15 pub state: String,
18 pub tag: String,
21 pub key: String,
24}
25
26impl Default for ChannelOptions {
27 fn default() -> Self {
28 Self {
29 id: utils::shortid(),
30 ack: false,
31 r#type: "*".to_string(),
32 state: "*".to_string(),
33 tag: "*".to_string(),
34 key: "*".to_string(),
35 }
36 }
37}
38
39impl ChannelOptions {
40 pub fn pattern(&self) -> String {
41 format!("{}:{}:{}:{}", self.r#type, self.state, self.tag, self.key)
42 }
43}
44
45pub struct Channel {
48 runtime: Arc<Runtime>,
49 ack: bool,
50 chan_id: String,
51 pattern: String,
52 glob: (
53 globset::GlobMatcher,
54 globset::GlobMatcher,
55 globset::GlobMatcher,
56 globset::GlobMatcher,
57 ),
58}
59
60impl Channel {
61 pub fn new(rt: &Arc<Runtime>) -> Self {
62 Self::channel(rt, &ChannelOptions::default())
63 }
64
65 #[allow(clippy::self_named_constructors)]
68 pub fn channel(rt: &Arc<Runtime>, options: &ChannelOptions) -> Self {
69 debug!("channel: {options:?}");
70 let pat_type = globset::Glob::new(&options.r#type)
71 .unwrap()
72 .compile_matcher();
73 let pat_state = globset::Glob::new(&options.state)
74 .unwrap()
75 .compile_matcher();
76 let pat_tag = globset::Glob::new(&options.tag).unwrap().compile_matcher();
77 let pat_key = globset::Glob::new(&options.key).unwrap().compile_matcher();
78
79 Self {
80 runtime: rt.clone(),
81 ack: options.ack,
82 chan_id: options.id.clone(),
83 pattern: options.pattern(),
84 glob: (pat_type, pat_state, pat_tag, pat_key),
85 }
86 }
87
88 pub fn on_message(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
117 let glob = self.glob.clone();
118 let runtime = self.runtime.clone();
119 let ack = self.ack;
120 let chan_id = self.chan_id.clone();
121 let pattern = self.pattern.clone();
122 self.runtime.emitter().on_message(&self.chan_id, move |e| {
123 info!("on_message: chan={} {e:?}", chan_id);
124 if is_match(&glob, e) {
125 store_if(&runtime, ack, &chan_id, &pattern, e);
126 f(e);
127 }
128 });
129 }
130
131 pub fn on_start(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
132 let glob = self.glob.clone();
133 let runtime = self.runtime.clone();
134 let ack = self.ack;
135 let chan_id = self.chan_id.clone();
136 let pattern = self.pattern.clone();
137 self.runtime.emitter().on_start(&self.chan_id, move |e| {
138 if is_match(&glob, e) {
139 store_if(&runtime, ack, &chan_id, &pattern, e);
140 f(e);
141 }
142 });
143 }
144
145 pub fn on_complete(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
146 let glob = self.glob.clone();
147 let runtime = self.runtime.clone();
148 let ack = self.ack;
149 let chan_id = self.chan_id.clone();
150 let pattern = self.pattern.clone();
151 self.runtime.emitter().on_complete(&self.chan_id, move |e| {
152 if is_match(&glob, e) {
153 store_if(&runtime, ack, &chan_id, &pattern, e);
154 f(e);
155 }
156 });
157 }
158
159 pub fn on_error(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
160 let glob = self.glob.clone();
161 let runtime = self.runtime.clone();
162 let ack = self.ack;
163 let chan_id = self.chan_id.clone();
164 let pattern = self.pattern.clone();
165 self.runtime.emitter().on_error(&self.chan_id, move |e| {
166 if is_match(&glob, e) {
167 store_if(&runtime, ack, &chan_id, &pattern, e);
168 f(e);
169 }
170 });
171 }
172
173 pub fn close(&self) {
174 self.runtime.emitter().remove(&self.chan_id);
175 }
176}
177
178fn store_if(runtime: &Arc<Runtime>, ack: bool, chan_id: &str, pattern: &str, message: &Message) {
179 if ack && !chan_id.is_empty() && message.retry_times == 0 {
180 println!("store: {message:?}");
181 let msg = message.into(chan_id, pattern);
182 runtime
183 .cache()
184 .store()
185 .base()
186 .messages()
187 .create(&msg)
188 .unwrap_or_else(|err| {
189 error!("channel.store_if_emit_id: {}", err.to_string());
190 eprintln!("channel.store_if_emit_id: {}", err);
191 false
192 });
193 }
194}
195
196fn is_match(
197 glob: &(
198 globset::GlobMatcher,
199 globset::GlobMatcher,
200 globset::GlobMatcher,
201 globset::GlobMatcher,
202 ),
203 e: &Event<Message>,
204) -> bool {
205 let (pat_type, pat_state, pat_tag, pat_key) = glob;
206 pat_type.is_match(&e.r#type)
207 && pat_state.is_match(e.state.as_ref())
208 && (pat_tag.is_match(&e.tag) || pat_tag.is_match(&e.model.tag))
209 && pat_key.is_match(&e.key)
210}