1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::mpsc::{Receiver, Sender};
5use tokio::sync::{mpsc, oneshot};
6use tokio::task::JoinHandle;
7use tokio::time::sleep;
8use tracing::warn;
9
10use crate::api::SearchResult;
11use crate::{DehashedApi, DehashedError, Query};
12
13#[derive(Debug)]
15pub struct ScheduledRequest {
16 query: Query,
17 ret: oneshot::Sender<Result<SearchResult, DehashedError>>,
18}
19
20impl ScheduledRequest {
21 pub fn new(query: Query, ret: oneshot::Sender<Result<SearchResult, DehashedError>>) -> Self {
26 Self { query, ret }
27 }
28}
29
30#[derive(Clone)]
35pub struct Scheduler {
36 handle: Arc<JoinHandle<()>>,
37 tx: Sender<ScheduledRequest>,
38}
39
40impl Scheduler {
41 pub(crate) fn new(api: &DehashedApi) -> Self {
42 let (tx, rx) = mpsc::channel(5);
43
44 let mut rx: Receiver<ScheduledRequest> = rx;
45 let task_api = api.clone();
46 let handle = tokio::spawn(async move {
47 while let Some(req) = rx.recv().await {
48 let res = task_api.search(req.query).await;
49 if req.ret.send(res).is_err() {
50 warn!("Couldn't send result back through channel");
51 }
52 sleep(Duration::from_millis(200)).await;
53 }
54 });
55 Self {
56 tx,
57 handle: Arc::new(handle),
58 }
59 }
60
61 pub fn retrieve_sender(&self) -> Sender<ScheduledRequest> {
66 self.tx.clone()
67 }
68
69 pub fn stop_scheduler(self) {
73 self.handle.abort();
74 }
75}