kovi_plugin_mail/
lib.rs

1mod config;
2
3use std::{collections::HashMap, sync::Arc};
4
5use async_imap::{Session, types::Fetch};
6use async_native_tls::TlsStream;
7use futures::TryStreamExt;
8use kovi::{
9    PluginBuilder as plugin, RuntimeBot,
10    chrono::{DateTime, FixedOffset, Utc},
11    log::{info, warn},
12    tokio::{net::TcpStream, sync::RwLock, time},
13};
14
15use crate::config::MailConfig;
16
17type MailSession = Session<TlsStream<TcpStream>>;
18type MailSessions = HashMap<String, Arc<RwLock<MailSession>>>;
19
20struct State {
21    date: DateTime<FixedOffset>,
22}
23
24#[derive(Debug)]
25struct MailInfo {
26    subject: String,
27    date: DateTime<FixedOffset>,
28}
29
30#[kovi::plugin]
31async fn main() {
32    let bot = plugin::get_runtime_bot();
33    let config = config::init(bot.get_data_path()).await.unwrap();
34
35    let sessions: Arc<RwLock<MailSessions>> = Arc::new(RwLock::new(MailSessions::new()));
36
37    info!("[mail] Connecting to mail servers.");
38
39    for cfg in config.mails {
40        let state = State {
41            date: Utc::now().fixed_offset(),
42        };
43        let state = Arc::new(RwLock::new(state));
44
45        info!("[mail] {} initialized.", &cfg.email);
46
47        plugin::cron(&format!("0 0/{} * * * ?", config.interval), {
48            let bot = bot.clone();
49            let state = state.clone();
50            let sessions = sessions.clone();
51            move || check_mails(cfg.clone(), bot.clone(), sessions.clone(), state.clone())
52        })
53        .unwrap();
54    }
55
56    plugin::drop({
57        let sessions = sessions.clone();
58        move || on_drop(sessions.clone())
59    });
60
61    info!("[mail] Ready to put eyes on mails!")
62}
63
64async fn pull_mails(session: &Arc<RwLock<MailSession>>) -> Result<MailInfo, String> {
65    let mut session = session.write().await;
66    let mails = session
67        .fetch("*", "ALL")
68        .await
69        .map_err(|e| format!("Error when pulling mails: {}", e))?;
70    let messages: Vec<Fetch> = mails
71        .try_collect()
72        .await
73        .map_err(|e| format!("Error when pulling mails: {}", e))?;
74    let fetch = messages.last().ok_or("No available mail found!")?;
75    Ok(MailInfo {
76        subject: {
77            let sub_bytes = &fetch
78                .envelope()
79                .ok_or("No envelop found for the latest mail!")?
80                .subject
81                .clone()
82                .ok_or("No subject found for the latest mail!")?;
83            encoded_words::decode(
84                str::from_utf8(&sub_bytes)
85                    .map_err(|e| format!("Invalid UTF-8 Encoding of Mail Subject: {}", e))?,
86            )
87            .map_err(|e| format!("Error when decoding subject: {}", e))?
88            .decoded
89        },
90        date: fetch
91            .internal_date()
92            .ok_or("No date found for the latest mail!")?,
93    })
94}
95
96async fn check_mails(
97    cfg: MailConfig,
98    bot: Arc<RuntimeBot>,
99    sessions: Arc<RwLock<MailSessions>>,
100    state: Arc<RwLock<State>>,
101) {
102    info!("[mail] Checking mails...");
103
104    let session = match time::timeout(time::Duration::from_secs(10), cfg.build_session()).await {
105        Ok(session) => session,
106        Err(e) => {
107            warn!("[mail] Timeout when connecting to mail server: {e}.");
108            return;
109        }
110    };
111
112    if let Err(e) = session {
113        warn!("[mail] Failed to connect to mail server: {e}.");
114        return;
115    }
116
117    let session = Arc::new(RwLock::new(session.unwrap()));
118    sessions
119        .write()
120        .await
121        .insert(cfg.email.clone(), session.clone());
122
123    info!("[mail] Connected to {}.", &cfg.email);
124
125    let mail = pull_mails(&session).await;
126    if mail.is_err() {
127        warn!("[mail] <{}> {}", cfg.email, mail.unwrap_err());
128        return;
129    }
130
131    let mail = mail.unwrap();
132
133    let mut state = state.write().await;
134
135    if mail.date > state.date {
136        state.date = mail.date;
137        info!("[mail] New mail detected!");
138        let message = format!("{} 收到新邮件!\n{}", &cfg.email, mail.subject);
139        if let Some(users) = &cfg.notify_users {
140            for user in users {
141                bot.send_private_msg(user.to_owned(), message.clone());
142            }
143        }
144        if let Some(groups) = &cfg.notify_groups {
145            for group in groups {
146                bot.send_private_msg(group.to_owned(), message.clone());
147            }
148        }
149    } else {
150        info!("[mail] No new mail detected.");
151    }
152
153    if let Err(e) = session.write().await.logout().await {
154        warn!("[mail] Error when logging out: {e}.");
155    } else {
156        info!("[mail] Logged out from {}.", &cfg.email);
157    }
158    sessions.write().await.remove(&cfg.email);
159}
160
161async fn on_drop(sessions: Arc<RwLock<MailSessions>>) {
162    let mut sessions = sessions.write().await;
163    for (_, s) in sessions.iter() {
164        let mut session = s.write().await;
165        session.logout().await.unwrap();
166    }
167    sessions.clear();
168    info!("[mail] Logged out mail sessions");
169}