revolt_database/tasks/
process_embeds.rs

1use crate::{models::Message, AppendMessage, Database};
2
3use futures::future::join_all;
4use linkify::{LinkFinder, LinkKind};
5use regex::Regex;
6use revolt_config::config;
7use revolt_result::Result;
8
9use async_lock::Semaphore;
10use async_std::task::spawn;
11use deadqueue::limited::Queue;
12use once_cell::sync::Lazy;
13use revolt_models::v0::Embed;
14use std::{collections::HashSet, sync::Arc};
15
16use isahc::prelude::*;
17
18/// Task information
19#[derive(Debug)]
20struct EmbedTask {
21    /// Channel we're processing the event in
22    channel: String,
23    /// ID of the message we're processing
24    id: String,
25    /// Content of the message
26    content: String,
27}
28
29static Q: Lazy<Queue<EmbedTask>> = Lazy::new(|| Queue::new(10_000));
30
31/// Queue a new task for a worker
32pub async fn queue(channel: String, id: String, content: String) {
33    Q.try_push(EmbedTask {
34        channel,
35        id,
36        content,
37    })
38    .ok();
39
40    info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
41}
42
43/// Start a new worker
44pub async fn worker(db: Database) {
45    let semaphore = Arc::new(Semaphore::new(
46        config().await.api.workers.max_concurrent_connections,
47    ));
48
49    loop {
50        let task = Q.pop().await;
51        let db = db.clone();
52        let semaphore = semaphore.clone();
53
54        spawn(async move {
55            let config = config().await;
56            let embeds = generate(
57                task.content,
58                &config.hosts.january,
59                config.features.limits.global.message_embeds,
60                semaphore,
61            )
62            .await;
63
64            if let Ok(embeds) = embeds {
65                if let Err(err) = Message::append(
66                    &db,
67                    task.id,
68                    task.channel,
69                    AppendMessage {
70                        embeds: Some(embeds),
71                    },
72                )
73                .await
74                {
75                    error!("Encountered an error appending to message: {:?}", err);
76                }
77            }
78        });
79    }
80}
81
82static RE_CODE: Lazy<Regex> = Lazy::new(|| Regex::new("```(?:.|\n)+?```|`(?:.|\n)+?`").unwrap());
83static RE_IGNORED: Lazy<Regex> = Lazy::new(|| Regex::new("(<http.+>)").unwrap());
84
85pub async fn generate(
86    content: String,
87    host: &str,
88    max_embeds: usize,
89    semaphore: Arc<Semaphore>,
90) -> Result<Vec<Embed>> {
91    // Ignore code blocks.
92    let content = RE_CODE.replace_all(&content, "");
93
94    // Ignore all content between angle brackets starting with http.
95    let content = RE_IGNORED.replace_all(&content, "");
96
97    let content = content
98        // Ignore quoted lines.
99        .split('\n')
100        .map(|v| {
101            if let Some(c) = v.chars().next() {
102                if c == '>' {
103                    return "";
104                }
105            }
106
107            v
108        })
109        .collect::<Vec<&str>>()
110        .join("\n");
111
112    let mut finder = LinkFinder::new();
113    finder.kinds(&[LinkKind::Url]);
114
115    // Process all links, stripping anchors and
116    // only taking up to `max_embeds` of links.
117    let links: Vec<String> = finder
118        .links(&content)
119        .map(|x| {
120            x.as_str()
121                .chars()
122                .take_while(|&ch| ch != '#')
123                .collect::<String>()
124        })
125        .collect::<HashSet<String>>()
126        .into_iter()
127        .take(max_embeds)
128        .collect();
129
130    // If no links, fail out.
131    if links.is_empty() {
132        return Err(create_error!(LabelMe));
133    }
134
135    // TODO: batch request to january
136    let mut tasks = Vec::new();
137
138    for link in links {
139        let semaphore = semaphore.clone();
140        let host = host.to_string();
141        tasks.push(spawn(async move {
142            let guard = semaphore.acquire().await;
143
144            if let Ok(mut response) = isahc::get_async(format!(
145                "{host}/embed?url={}",
146                url_escape::encode_component(&link)
147            ))
148            .await
149            {
150                drop(guard);
151                response.json::<Embed>().await.ok()
152            } else {
153                None
154            }
155        }));
156    }
157
158    let embeds = join_all(tasks)
159        .await
160        .into_iter()
161        .flatten()
162        .collect::<Vec<Embed>>();
163
164    // Prevent database update when no embeds are found.
165    if !embeds.is_empty() {
166        Ok(embeds)
167    } else {
168        Err(create_error!(LabelMe))
169    }
170}