1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
use std::sync::Arc;
use std::time::Duration;

use log::warn;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::sleep;

use crate::api::SearchResult;
use crate::{DehashedApi, DehashedError, Query};

/// A search request for the [Scheduler].
#[derive(Debug)]
pub struct ScheduledRequest {
    query: Query,
    ret: oneshot::Sender<Result<SearchResult, DehashedError>>,
}

impl ScheduledRequest {
    /// Create a new request
    ///
    /// The [Scheduler] will sent the result back through the provided channel.
    /// If sending fails, the result is dropped and the scheduler continues with the next request.
    pub fn new(query: Query, ret: oneshot::Sender<Result<SearchResult, DehashedError>>) -> Self {
        Self { query, ret }
    }
}

/// The scheduler to manage with the rate limit of the unhashed api
///
/// Make sure that you just spawn one instance of the scheduler.
/// You can receive and schedule as many requests as you like on the instance.
#[derive(Clone)]
pub struct Scheduler {
    handle: Arc<JoinHandle<()>>,
    tx: Sender<ScheduledRequest>,
}

impl Scheduler {
    pub(crate) fn new(api: &DehashedApi) -> Self {
        let (tx, rx) = mpsc::channel(5);

        let mut rx: Receiver<ScheduledRequest> = rx;
        let task_api = api.clone();
        let handle = tokio::spawn(async move {
            while let Some(req) = rx.recv().await {
                let res = task_api.search(req.query).await;
                if req.ret.send(res).is_err() {
                    warn!("Couldn't send result back through channel");
                }
                sleep(Duration::from_millis(200)).await;
            }
        });
        Self {
            tx,
            handle: Arc::new(handle),
        }
    }

    /// Retrieve a [Sender] to allow pushing tasks to the scheduler.
    ///
    /// To use multiple senders, you can clone the one you've received or
    /// retrieve a new one using this method
    pub fn retrieve_sender(&self) -> Sender<ScheduledRequest> {
        self.tx.clone()
    }

    /// Stop the [Scheduler].
    ///
    /// This will abort the tokio task.
    pub fn stop_scheduler(self) {
        self.handle.abort();
    }
}