dehashed_rs/
scheduler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::mpsc::{Receiver, Sender};
5use tokio::sync::{mpsc, oneshot};
6use tokio::task::JoinHandle;
7use tokio::time::sleep;
8use tracing::warn;
9
10use crate::api::SearchResult;
11use crate::{DehashedApi, DehashedError, Query};
12
13/// A search request for the [Scheduler].
14#[derive(Debug)]
15pub struct ScheduledRequest {
16    query: Query,
17    ret: oneshot::Sender<Result<SearchResult, DehashedError>>,
18}
19
20impl ScheduledRequest {
21    /// Create a new request
22    ///
23    /// The [Scheduler] will sent the result back through the provided channel.
24    /// If sending fails, the result is dropped and the scheduler continues with the next request.
25    pub fn new(query: Query, ret: oneshot::Sender<Result<SearchResult, DehashedError>>) -> Self {
26        Self { query, ret }
27    }
28}
29
30/// The scheduler to manage with the rate limit of the unhashed api
31///
32/// Make sure that you just spawn one instance of the scheduler.
33/// You can receive and schedule as many requests as you like on the instance.
34#[derive(Clone)]
35pub struct Scheduler {
36    handle: Arc<JoinHandle<()>>,
37    tx: Sender<ScheduledRequest>,
38}
39
40impl Scheduler {
41    pub(crate) fn new(api: &DehashedApi) -> Self {
42        let (tx, rx) = mpsc::channel(5);
43
44        let mut rx: Receiver<ScheduledRequest> = rx;
45        let task_api = api.clone();
46        let handle = tokio::spawn(async move {
47            while let Some(req) = rx.recv().await {
48                let res = task_api.search(req.query).await;
49                if req.ret.send(res).is_err() {
50                    warn!("Couldn't send result back through channel");
51                }
52                sleep(Duration::from_millis(200)).await;
53            }
54        });
55        Self {
56            tx,
57            handle: Arc::new(handle),
58        }
59    }
60
61    /// Retrieve a [Sender] to allow pushing tasks to the scheduler.
62    ///
63    /// To use multiple senders, you can clone the one you've received or
64    /// retrieve a new one using this method
65    pub fn retrieve_sender(&self) -> Sender<ScheduledRequest> {
66        self.tx.clone()
67    }
68
69    /// Stop the [Scheduler].
70    ///
71    /// This will abort the tokio task.
72    pub fn stop_scheduler(self) {
73        self.handle.abort();
74    }
75}