revolt_database/tasks/
last_message_id.rs1use deadqueue::limited::Queue;
3use once_cell::sync::Lazy;
4use std::{collections::HashMap, time::Duration};
5
6use crate::{Database, PartialChannel};
7
8use super::DelayedTask;
9
10struct Data {
12 channel: String,
14 id: String,
16 is_dm: bool,
18}
19
20#[derive(Debug)]
22struct Task {
23 id: String,
25 is_dm: bool,
27}
28
29static Q: Lazy<Queue<Data>> = Lazy::new(|| Queue::new(10_000));
30
31pub async fn queue(channel: String, id: String, is_dm: bool) {
33 Q.try_push(Data { channel, id, is_dm }).ok();
34 info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
35}
36
37pub async fn worker(db: Database) {
39 let mut tasks = HashMap::<String, DelayedTask<Task>>::new();
40 let mut keys = vec![];
41
42 loop {
43 for (key, task) in &tasks {
45 if task.should_run() {
46 keys.push(key.clone());
47 }
48 }
49
50 for key in &keys {
52 if let Some(task) = tasks.remove(key) {
53 let Task { id, is_dm, .. } = task.data;
54
55 let mut channel = PartialChannel {
56 last_message_id: Some(id.to_string()),
57 ..Default::default()
58 };
59
60 if is_dm {
61 channel.active = Some(true);
62 }
63
64 match db.update_channel(key, &channel, vec![]).await {
65 Ok(_) => info!("Updated last_message_id for {key} to {id}."),
66 Err(err) => error!("Failed to update last_message_id with {err:?}!"),
67 }
68 }
69 }
70
71 keys.clear();
73
74 while let Some(Data { channel, id, is_dm }) = Q.try_pop() {
76 if let Some(task) = tasks.get_mut(&channel) {
77 task.data.id = id;
78 task.delay();
79 } else {
80 tasks.insert(channel, DelayedTask::new(Task { id, is_dm }));
81 }
82 }
83
84 async_std::task::sleep(Duration::from_secs(1)).await;
86 }
87}