revolt_database/tasks/
last_message_id.rs

1// Queue Type: Debounced
2use deadqueue::limited::Queue;
3use once_cell::sync::Lazy;
4use std::{collections::HashMap, time::Duration};
5
6use crate::{Database, PartialChannel};
7
8use super::DelayedTask;
9
10/// Task information
11struct Data {
12    /// Channel to update
13    channel: String,
14    /// Latest message ID
15    id: String,
16    /// Whether the channel is a DM
17    is_dm: bool,
18}
19
20/// Task information
21#[derive(Debug)]
22struct Task {
23    /// Latest message ID
24    id: String,
25    /// Whether the channel is a DM
26    is_dm: bool,
27}
28
29static Q: Lazy<Queue<Data>> = Lazy::new(|| Queue::new(10_000));
30
31/// Queue a new task for a worker
32pub async fn queue(channel: String, id: String, is_dm: bool) {
33    Q.try_push(Data { channel, id, is_dm }).ok();
34    info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
35}
36
37/// Start a new worker
38pub async fn worker(db: Database) {
39    let mut tasks = HashMap::<String, DelayedTask<Task>>::new();
40    let mut keys = vec![];
41
42    loop {
43        // Find due tasks.
44        for (key, task) in &tasks {
45            if task.should_run() {
46                keys.push(key.clone());
47            }
48        }
49
50        // Commit any due tasks to the database.
51        for key in &keys {
52            if let Some(task) = tasks.remove(key) {
53                let Task { id, is_dm, .. } = task.data;
54
55                let mut channel = PartialChannel {
56                    last_message_id: Some(id.to_string()),
57                    ..Default::default()
58                };
59
60                if is_dm {
61                    channel.active = Some(true);
62                }
63
64                match db.update_channel(key, &channel, vec![]).await {
65                    Ok(_) => info!("Updated last_message_id for {key} to {id}."),
66                    Err(err) => error!("Failed to update last_message_id with {err:?}!"),
67                }
68            }
69        }
70
71        // Clear keys
72        keys.clear();
73
74        // Queue incoming tasks.
75        while let Some(Data { channel, id, is_dm }) = Q.try_pop() {
76            if let Some(task) = tasks.get_mut(&channel) {
77                task.data.id = id;
78                task.delay();
79            } else {
80                tasks.insert(channel, DelayedTask::new(Task { id, is_dm }));
81            }
82        }
83
84        // Sleep for an arbitrary amount of time.
85        async_std::task::sleep(Duration::from_secs(1)).await;
86    }
87}