1use crate::{Event, Message, scheduler::Runtime, utils};
2use std::sync::Arc;
3use tracing::{debug, error, info};
4
5#[derive(Debug, Clone)]
6pub struct ChannelOptions {
7 pub id: String,
8
9 pub ack: bool,
11
12 pub r#type: String,
15 pub state: String,
18 pub tag: String,
21 pub key: String,
24
25 pub uses: String,
27}
28
29impl Default for ChannelOptions {
30 fn default() -> Self {
31 Self {
32 id: utils::shortid(),
33 ack: false,
34 r#type: "*".to_string(),
35 state: "*".to_string(),
36 tag: "*".to_string(),
37 key: "*".to_string(),
38 uses: "*".to_string(),
39 }
40 }
41}
42
43impl ChannelOptions {
44 pub fn pattern(&self) -> String {
45 format!("{}:{}:{}:{}", self.r#type, self.state, self.tag, self.key)
46 }
47}
48
49pub struct Channel {
52 runtime: Arc<Runtime>,
53 ack: bool,
54 chan_id: String,
55 pattern: String,
56 glob: (
57 globset::GlobMatcher,
58 globset::GlobMatcher,
59 globset::GlobMatcher,
60 globset::GlobMatcher,
61 globset::GlobMatcher,
62 ),
63}
64
65impl Channel {
66 pub fn new(rt: &Arc<Runtime>) -> Self {
67 Self::channel(rt, &ChannelOptions::default())
68 }
69
70 #[allow(clippy::self_named_constructors)]
73 pub fn channel(rt: &Arc<Runtime>, options: &ChannelOptions) -> Self {
74 debug!("channel: {options:?}");
75 let pat_type = globset::Glob::new(&options.r#type)
76 .unwrap()
77 .compile_matcher();
78 let pat_state = globset::Glob::new(&options.state)
79 .unwrap()
80 .compile_matcher();
81 let pat_tag = globset::Glob::new(&options.tag).unwrap().compile_matcher();
82 let pat_key = globset::Glob::new(&options.key).unwrap().compile_matcher();
83 let pat_uses = globset::Glob::new(&options.uses).unwrap().compile_matcher();
84 Self {
85 runtime: rt.clone(),
86 ack: options.ack,
87 chan_id: options.id.clone(),
88 pattern: options.pattern(),
89 glob: (pat_type, pat_state, pat_tag, pat_key, pat_uses),
90 }
91 }
92
93 pub fn on_message(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
122 let glob = self.glob.clone();
123 let runtime = self.runtime.clone();
124 let ack = self.ack;
125 let chan_id = self.chan_id.clone();
126 let pattern = self.pattern.clone();
127 self.runtime.emitter().on_message(&self.chan_id, move |e| {
128 info!("on_message: chan={} {e:?}", chan_id);
129 if is_match(&glob, e) {
130 store_if(&runtime, ack, &chan_id, &pattern, e);
131 f(e);
132 }
133 });
134 }
135
136 pub fn on_start(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
137 let glob = self.glob.clone();
138 let runtime = self.runtime.clone();
139 let ack = self.ack;
140 let chan_id = self.chan_id.clone();
141 let pattern = self.pattern.clone();
142 self.runtime.emitter().on_start(&self.chan_id, move |e| {
143 if is_match(&glob, e) {
144 store_if(&runtime, ack, &chan_id, &pattern, e);
145 f(e);
146 }
147 });
148 }
149
150 pub fn on_complete(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
151 let glob = self.glob.clone();
152 let runtime = self.runtime.clone();
153 let ack = self.ack;
154 let chan_id = self.chan_id.clone();
155 let pattern = self.pattern.clone();
156 self.runtime.emitter().on_complete(&self.chan_id, move |e| {
157 if is_match(&glob, e) {
158 store_if(&runtime, ack, &chan_id, &pattern, e);
159 f(e);
160 }
161 });
162 }
163
164 pub fn on_error(self: &Arc<Self>, f: impl Fn(&Event<Message>) + Send + Sync + 'static) {
165 let glob = self.glob.clone();
166 let runtime = self.runtime.clone();
167 let ack = self.ack;
168 let chan_id = self.chan_id.clone();
169 let pattern = self.pattern.clone();
170 self.runtime.emitter().on_error(&self.chan_id, move |e| {
171 if is_match(&glob, e) {
172 store_if(&runtime, ack, &chan_id, &pattern, e);
173 f(e);
174 }
175 });
176 }
177
178 pub fn close(&self) {
179 self.runtime.emitter().remove(&self.chan_id);
180 }
181}
182
183fn store_if(runtime: &Arc<Runtime>, ack: bool, chan_id: &str, pattern: &str, message: &Message) {
184 if ack && !chan_id.is_empty() && message.retry_times == 0 {
185 info!("store: {message:?}");
186 let msg = message.into(chan_id, pattern);
187 runtime
188 .cache()
189 .store()
190 .messages()
191 .create(&msg)
192 .unwrap_or_else(|err| {
193 error!("channel.store_if_emit_id: {}", err.to_string());
194 eprintln!("channel.store_if_emit_id: {}", err);
195 false
196 });
197 }
198}
199
200fn is_match(
201 glob: &(
202 globset::GlobMatcher,
203 globset::GlobMatcher,
204 globset::GlobMatcher,
205 globset::GlobMatcher,
206 globset::GlobMatcher,
207 ),
208 e: &Event<Message>,
209) -> bool {
210 let (pat_type, pat_state, pat_tag, pat_key, pat_uses) = glob;
211 pat_type.is_match(&e.r#type)
212 && pat_state.is_match(e.state.as_ref())
213 && (pat_tag.is_match(&e.tag) || pat_tag.is_match(&e.model.tag))
214 && pat_key.is_match(&e.key)
215 && pat_uses.is_match(&e.uses)
216}