tulpje_framework/
framework.rs

1use std::{future::Future, pin::Pin, sync::Arc};
2
3use tokio::{sync::mpsc, task::JoinHandle};
4
5use tokio_util::{sync::CancellationToken, task::TaskTracker};
6use crate::Metadata;
7use twilight_gateway::Event;
8use twilight_http::Client;
9use twilight_model::id::{marker::ApplicationMarker, Id};
10
11use crate::handler::task_handler::TaskHandler;
12use crate::scheduler::{SchedulerHandle, SchedulerTaskMessage};
13use crate::{Context, Error, Registry};
14
15type SetupFunc<T> = fn(ctx: Context<T>) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
16
17#[derive(Clone)]
18pub struct FrameworkBuilder<T: Clone + Send + Sync> {
19    registry: Arc<Registry<T>>,
20    client: Arc<Client>,
21    app_id: Id<ApplicationMarker>,
22    user_data: Arc<T>,
23
24    setup_fn: Option<SetupFunc<T>>,
25}
26
27impl<T: Clone + Send + Sync + 'static> FrameworkBuilder<T> {
28    pub fn new(
29        registry: Arc<Registry<T>>,
30        client: Client,
31        app_id: Id<ApplicationMarker>,
32        user_data: T,
33    ) -> Self {
34        Self {
35            registry,
36            client: Arc::new(client),
37            app_id,
38            user_data: Arc::new(user_data),
39            setup_fn: None,
40        }
41    }
42
43    pub fn setup(&mut self, func: SetupFunc<T>) -> &mut Self {
44        self.setup_fn = Some(func);
45        self
46    }
47
48    pub fn build(&self) -> Framework<T> {
49        Framework::new(
50            Arc::clone(&self.registry),
51            Arc::clone(&self.client),
52            self.app_id,
53            Arc::clone(&self.user_data),
54            self.setup_fn,
55        )
56    }
57}
58
59pub struct Framework<T: Clone + Send + Sync> {
60    ctx: Context<T>,
61    setup_fn: Option<SetupFunc<T>>,
62
63    scheduler: SchedulerHandle<T>,
64    dispatcher: DispatchHandle,
65}
66
67impl<T: Clone + Send + Sync + 'static> Framework<T> {
68    pub fn new(
69        registry: Arc<Registry<T>>,
70        client: Arc<Client>,
71        application_id: Id<ApplicationMarker>,
72        services: Arc<T>,
73        setup_fn: Option<SetupFunc<T>>,
74    ) -> Self {
75        let ctx = Context {
76            application_id,
77            services,
78            client,
79        };
80        let scheduler =
81            SchedulerHandle::new(registry.tasks.values().cloned().collect(), ctx.clone());
82        let dispatcher = DispatchHandle::new(registry, ctx.clone());
83
84        Self {
85            ctx,
86            setup_fn,
87
88            scheduler,
89            dispatcher,
90        }
91    }
92
93    pub async fn start(&mut self) -> Result<(), Error> {
94        if let Some(setup_fn) = self.setup_fn.take() {
95            (setup_fn)(self.ctx.clone())
96                .await
97                .map_err(|err| format!("error running setup function: {}", err))?;
98        }
99
100        self.scheduler
101            .start()
102            .map_err(|err| format!("error starting scheduled tasks: {}", err))?;
103
104        Ok(())
105    }
106
107    pub fn enable_task(
108        &mut self,
109        handler: TaskHandler<T>,
110    ) -> Result<(), mpsc::error::SendError<SchedulerTaskMessage<T>>> {
111        self.scheduler.enable_task(handler)
112    }
113
114    pub fn disable_task(
115        &mut self,
116        name: String,
117    ) -> Result<(), mpsc::error::SendError<SchedulerTaskMessage<T>>> {
118        self.scheduler.disable_task(name)
119    }
120
121    pub fn sender(&self) -> Sender {
122        Sender {
123            sender: self.dispatcher.sender.clone(),
124        }
125    }
126
127    pub fn send(
128        &mut self,
129        meta: Metadata,
130        event: Event,
131    ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
132        self.dispatcher.send(meta, event)
133    }
134
135    pub async fn shutdown(&mut self) {
136        self.scheduler.shutdown();
137        self.dispatcher.shutdown();
138    }
139
140    pub async fn join(&mut self) -> Result<(), Error> {
141        self.scheduler.join().await?;
142        self.dispatcher.join().await?;
143
144        Ok(())
145    }
146}
147
148struct DispatchHandle {
149    sender: mpsc::UnboundedSender<(Metadata, Event)>,
150    shutdown: CancellationToken,
151    handle: Option<JoinHandle<()>>,
152}
153impl DispatchHandle {
154    fn new<T: Clone + Send + Sync + 'static>(registry: Arc<Registry<T>>, ctx: Context<T>) -> Self {
155        let (sender, receiver) = mpsc::unbounded_channel();
156        let shutdown = CancellationToken::new();
157
158        let mut dispatch = Dispatch::new(ctx, registry, receiver, shutdown.child_token());
159        let handle = Some(tokio::spawn(async move { dispatch.run().await }));
160
161        Self {
162            sender,
163            shutdown,
164            handle,
165        }
166    }
167
168    fn send(
169        &mut self,
170        meta: Metadata,
171        event: Event,
172    ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
173        self.sender.send((meta, event))
174    }
175
176    fn shutdown(&mut self) {
177        self.shutdown.cancel();
178    }
179
180    async fn join(&mut self) -> Result<(), Error> {
181        Ok(self
182            .handle
183            .take()
184            .ok_or("Dispatch already shutdown")?
185            .await?)
186    }
187}
188
189struct Dispatch<T: Clone + Send + Sync> {
190    registry: Arc<Registry<T>>,
191    ctx: Context<T>,
192
193    receiver: mpsc::UnboundedReceiver<(Metadata, Event)>,
194    shutdown: CancellationToken,
195
196    tracker: TaskTracker,
197}
198impl<T: Clone + Send + Sync + 'static> Dispatch<T> {
199    fn new(
200        ctx: Context<T>,
201        registry: Arc<Registry<T>>,
202
203        receiver: mpsc::UnboundedReceiver<(Metadata, Event)>,
204        shutdown: CancellationToken,
205    ) -> Self {
206        Self {
207            registry,
208            ctx,
209
210            receiver,
211            shutdown,
212
213            tracker: TaskTracker::new(),
214        }
215    }
216
217    async fn run(&mut self) {
218        loop {
219            tokio::select! {
220                Some((meta, event)) = self.receiver.recv() => {
221                    let registry = Arc::clone(&self.registry);
222                    let ctx = self.ctx.clone();
223
224                    self.tracker.spawn(async move {
225                        crate::handle(meta, ctx, &registry, event).await;
226                    });
227                },
228                () = self.shutdown.cancelled() => break,
229            }
230        }
231
232        self.receiver.close();
233        self.tracker.close();
234
235        self.tracker.wait().await;
236    }
237}
238
239pub struct Sender {
240    sender: mpsc::UnboundedSender<(Metadata, Event)>,
241}
242
243impl Sender {
244    pub fn send(
245        &self,
246        meta: Metadata,
247        event: Event,
248    ) -> Result<(), mpsc::error::SendError<(Metadata, Event)>> {
249        self.sender.send((meta, event))
250    }
251
252    pub fn closed(&self) -> bool {
253        self.sender.is_closed()
254    }
255}