1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Queue Type: Debounced
use deadqueue::limited::Queue;
use once_cell::sync::Lazy;
use std::{collections::HashMap, time::Duration};

use crate::{Database, PartialChannel};

use super::DelayedTask;

/// Task information
struct Data {
    /// Channel to update
    channel: String,
    /// Latest message ID
    id: String,
    /// Whether the channel is a DM
    is_dm: bool,
}

/// Task information
#[derive(Debug)]
struct Task {
    /// Latest message ID
    id: String,
    /// Whether the channel is a DM
    is_dm: bool,
}

static Q: Lazy<Queue<Data>> = Lazy::new(|| Queue::new(10_000));

/// Queue a new task for a worker
pub async fn queue(channel: String, id: String, is_dm: bool) {
    Q.try_push(Data { channel, id, is_dm }).ok();
    info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
}

/// Start a new worker
pub async fn worker(db: Database) {
    let mut tasks = HashMap::<String, DelayedTask<Task>>::new();
    let mut keys = vec![];

    loop {
        // Find due tasks.
        for (key, task) in &tasks {
            if task.should_run() {
                keys.push(key.clone());
            }
        }

        // Commit any due tasks to the database.
        for key in &keys {
            if let Some(task) = tasks.remove(key) {
                let Task { id, is_dm, .. } = task.data;

                let mut channel = PartialChannel {
                    last_message_id: Some(id.to_string()),
                    ..Default::default()
                };

                if is_dm {
                    channel.active = Some(true);
                }

                match db.update_channel(key, &channel, vec![]).await {
                    Ok(_) => info!("Updated last_message_id for {key} to {id}."),
                    Err(err) => error!("Failed to update last_message_id with {err:?}!"),
                }
            }
        }

        // Clear keys
        keys.clear();

        // Queue incoming tasks.
        while let Some(Data { channel, id, is_dm }) = Q.try_pop() {
            if let Some(task) = tasks.get_mut(&channel) {
                task.data.id = id;
                task.delay();
            } else {
                tasks.insert(channel, DelayedTask::new(Task { id, is_dm }));
            }
        }

        // Sleep for an arbitrary amount of time.
        async_std::task::sleep(Duration::from_secs(1)).await;
    }
}