revolt_database/tasks/
process_embeds.rs1use crate::{models::Message, AppendMessage, Database};
2
3use futures::future::join_all;
4use linkify::{LinkFinder, LinkKind};
5use regex::Regex;
6use revolt_config::config;
7use revolt_result::Result;
8
9use async_lock::Semaphore;
10use async_std::task::spawn;
11use deadqueue::limited::Queue;
12use once_cell::sync::Lazy;
13use revolt_models::v0::Embed;
14use std::{collections::HashSet, sync::Arc};
15
16use isahc::prelude::*;
17
18#[derive(Debug)]
20struct EmbedTask {
21 channel: String,
23 id: String,
25 content: String,
27}
28
29static Q: Lazy<Queue<EmbedTask>> = Lazy::new(|| Queue::new(10_000));
30
31pub async fn queue(channel: String, id: String, content: String) {
33 Q.try_push(EmbedTask {
34 channel,
35 id,
36 content,
37 })
38 .ok();
39
40 info!("Queue is using {} slots from {}.", Q.len(), Q.capacity());
41}
42
43pub async fn worker(db: Database) {
45 let semaphore = Arc::new(Semaphore::new(
46 config().await.api.workers.max_concurrent_connections,
47 ));
48
49 loop {
50 let task = Q.pop().await;
51 let db = db.clone();
52 let semaphore = semaphore.clone();
53
54 spawn(async move {
55 let config = config().await;
56 let embeds = generate(
57 task.content,
58 &config.hosts.january,
59 config.features.limits.global.message_embeds,
60 semaphore,
61 )
62 .await;
63
64 if let Ok(embeds) = embeds {
65 if let Err(err) = Message::append(
66 &db,
67 task.id,
68 task.channel,
69 AppendMessage {
70 embeds: Some(embeds),
71 },
72 )
73 .await
74 {
75 error!("Encountered an error appending to message: {:?}", err);
76 }
77 }
78 });
79 }
80}
81
82static RE_CODE: Lazy<Regex> = Lazy::new(|| Regex::new("```(?:.|\n)+?```|`(?:.|\n)+?`").unwrap());
83static RE_IGNORED: Lazy<Regex> = Lazy::new(|| Regex::new("(<http.+>)").unwrap());
84
85pub async fn generate(
86 content: String,
87 host: &str,
88 max_embeds: usize,
89 semaphore: Arc<Semaphore>,
90) -> Result<Vec<Embed>> {
91 let content = RE_CODE.replace_all(&content, "");
93
94 let content = RE_IGNORED.replace_all(&content, "");
96
97 let content = content
98 .split('\n')
100 .map(|v| {
101 if let Some(c) = v.chars().next() {
102 if c == '>' {
103 return "";
104 }
105 }
106
107 v
108 })
109 .collect::<Vec<&str>>()
110 .join("\n");
111
112 let mut finder = LinkFinder::new();
113 finder.kinds(&[LinkKind::Url]);
114
115 let links: Vec<String> = finder
118 .links(&content)
119 .map(|x| {
120 x.as_str()
121 .chars()
122 .take_while(|&ch| ch != '#')
123 .collect::<String>()
124 })
125 .collect::<HashSet<String>>()
126 .into_iter()
127 .take(max_embeds)
128 .collect();
129
130 if links.is_empty() {
132 return Err(create_error!(LabelMe));
133 }
134
135 let mut tasks = Vec::new();
137
138 for link in links {
139 let semaphore = semaphore.clone();
140 let host = host.to_string();
141 tasks.push(spawn(async move {
142 let guard = semaphore.acquire().await;
143
144 if let Ok(mut response) = isahc::get_async(format!(
145 "{host}/embed?url={}",
146 url_escape::encode_component(&link)
147 ))
148 .await
149 {
150 drop(guard);
151 response.json::<Embed>().await.ok()
152 } else {
153 None
154 }
155 }));
156 }
157
158 let embeds = join_all(tasks)
159 .await
160 .into_iter()
161 .flatten()
162 .collect::<Vec<Embed>>();
163
164 if !embeds.is_empty() {
166 Ok(embeds)
167 } else {
168 Err(create_error!(LabelMe))
169 }
170}