1use std::{future::Future, pin::Pin, sync::Arc};
2
3use tokio::{sync::mpsc, task::JoinHandle};
4
5use tokio_util::{sync::CancellationToken, task::TaskTracker};
6use crate::Metadata;
7use twilight_gateway::Event;
8use twilight_http::Client;
9use twilight_model::id::{marker::ApplicationMarker, Id};
10
11use crate::handler::task_handler::TaskHandler;
12use crate::scheduler::{SchedulerHandle, SchedulerTaskMessage};
13use crate::{Context, Error, Registry};
14
15type SetupFunc<T> = fn(ctx: Context<T>) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
16
17#[derive(Clone)]
18pub struct FrameworkBuilder<T: Clone + Send + Sync> {
19 registry: Arc<Registry<T>>,
20 client: Arc<Client>,
21 app_id: Id<ApplicationMarker>,
22 user_data: Arc<T>,
23
24 setup_fn: Option<SetupFunc<T>>,
25}
26
27impl<T: Clone + Send + Sync + 'static> FrameworkBuilder<T> {
28 pub fn new(
29 registry: Arc<Registry<T>>,
30 client: Client,
31 app_id: Id<ApplicationMarker>,
32 user_data: T,
33 ) -> Self {
34 Self {
35 registry,
36 client: Arc::new(client),
37 app_id,
38 user_data: Arc::new(user_data),
39 setup_fn: None,
40 }
41 }
42
43 pub fn setup(&mut self, func: SetupFunc<T>) -> &mut Self {
44 self.setup_fn = Some(func);
45 self
46 }
47
48 pub fn build(&self) -> Framework<T> {
49 Framework::new(
50 Arc::clone(&self.registry),
51 Arc::clone(&self.client),
52 self.app_id,
53 Arc::clone(&self.user_data),
54 self.setup_fn,
55 )
56 }
57}
58
59pub struct Framework<T: Clone + Send + Sync> {
60 ctx: Context<T>,
61 setup_fn: Option<SetupFunc<T>>,
62
63 scheduler: SchedulerHandle<T>,
64 dispatcher: DispatchHandle,
65}
66
67impl<T: Clone + Send + Sync + 'static> Framework<T> {
68 pub fn new(
69 registry: Arc<Registry<T>>,
70 client: Arc<Client>,
71 application_id: Id<ApplicationMarker>,
72 services: Arc<T>,
73 setup_fn: Option<SetupFunc<T>>,
74 ) -> Self {
75 let ctx = Context {
76 application_id,
77 services,
78 client,
79 };
80 let scheduler =
81 SchedulerHandle::new(registry.tasks.values().cloned().collect(), ctx.clone());
82 let dispatcher = DispatchHandle::new(registry, ctx.clone());
83
84 Self {
85 ctx,
86 setup_fn,
87
88 scheduler,
89 dispatcher,
90 }
91 }
92
93 pub async fn start(&mut self) -> Result<(), Error> {
94 if let Some(setup_fn) = self.setup_fn.take() {
95 (setup_fn)(self.ctx.clone())
96 .await
97 .map_err(|err| format!("error running setup function: {}", err))?;
98 }
99
100 self.scheduler
101 .start()
102 .map_err(|err| format!("error starting scheduled tasks: {}", err))?;
103
104 Ok(())
105 }
106
107 pub fn enable_task(
108 &mut self,
109 handler: TaskHandler<T>,
110 ) -> Result<(), mpsc::error::SendError<SchedulerTaskMessage<T>>> {
111 self.scheduler.enable_task(handler)
112 }
113
114 pub fn disable_task(
115 &mut self,
116 name: String,
117 ) -> Result<(), mpsc::error::SendError<SchedulerTaskMessage<T>>> {
118 self.scheduler.disable_task(name)
119 }
120
121 pub fn sender(&self) -> Sender {
122 Sender {
123 sender: self.dispatcher.sender.clone(),
124 }
125 }
126
127 pub fn send(
128 &mut self,
129 meta: Metadata,
130 event: Event,
131 ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
132 self.dispatcher.send(meta, event)
133 }
134
135 pub async fn shutdown(&mut self) {
136 self.scheduler.shutdown();
137 self.dispatcher.shutdown();
138 }
139
140 pub async fn join(&mut self) -> Result<(), Error> {
141 self.scheduler.join().await?;
142 self.dispatcher.join().await?;
143
144 Ok(())
145 }
146}
147
148struct DispatchHandle {
149 sender: mpsc::UnboundedSender<(Metadata, Event)>,
150 shutdown: CancellationToken,
151 handle: Option<JoinHandle<()>>,
152}
153impl DispatchHandle {
154 fn new<T: Clone + Send + Sync + 'static>(registry: Arc<Registry<T>>, ctx: Context<T>) -> Self {
155 let (sender, receiver) = mpsc::unbounded_channel();
156 let shutdown = CancellationToken::new();
157
158 let mut dispatch = Dispatch::new(ctx, registry, receiver, shutdown.child_token());
159 let handle = Some(tokio::spawn(async move { dispatch.run().await }));
160
161 Self {
162 sender,
163 shutdown,
164 handle,
165 }
166 }
167
168 fn send(
169 &mut self,
170 meta: Metadata,
171 event: Event,
172 ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
173 self.sender.send((meta, event))
174 }
175
176 fn shutdown(&mut self) {
177 self.shutdown.cancel();
178 }
179
180 async fn join(&mut self) -> Result<(), Error> {
181 Ok(self
182 .handle
183 .take()
184 .ok_or("Dispatch already shutdown")?
185 .await?)
186 }
187}
188
189struct Dispatch<T: Clone + Send + Sync> {
190 registry: Arc<Registry<T>>,
191 ctx: Context<T>,
192
193 receiver: mpsc::UnboundedReceiver<(Metadata, Event)>,
194 shutdown: CancellationToken,
195
196 tracker: TaskTracker,
197}
198impl<T: Clone + Send + Sync + 'static> Dispatch<T> {
199 fn new(
200 ctx: Context<T>,
201 registry: Arc<Registry<T>>,
202
203 receiver: mpsc::UnboundedReceiver<(Metadata, Event)>,
204 shutdown: CancellationToken,
205 ) -> Self {
206 Self {
207 registry,
208 ctx,
209
210 receiver,
211 shutdown,
212
213 tracker: TaskTracker::new(),
214 }
215 }
216
217 async fn run(&mut self) {
218 loop {
219 tokio::select! {
220 Some((meta, event)) = self.receiver.recv() => {
221 let registry = Arc::clone(&self.registry);
222 let ctx = self.ctx.clone();
223
224 self.tracker.spawn(async move {
225 crate::handle(meta, ctx, ®istry, event).await;
226 });
227 },
228 () = self.shutdown.cancelled() => break,
229 }
230 }
231
232 self.receiver.close();
233 self.tracker.close();
234
235 self.tracker.wait().await;
236 }
237}
238
239pub struct Sender {
240 sender: mpsc::UnboundedSender<(Metadata, Event)>,
241}
242
243impl Sender {
244 pub fn send(
245 &self,
246 meta: Metadata,
247 event: Event,
248 ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
249 self.sender.send((meta, event))
250 }
251
252 pub fn closed(&self) -> bool {
253 self.sender.is_closed()
254 }
255}