xmpp_client_rs/
core.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 use std::any::TypeId;
5 use std::collections::{HashMap, VecDeque};
6 use std::convert::{TryFrom, TryInto};
7 use std::fmt::{self, Debug, Display};
8 use std::future::Future;
9 use std::str::FromStr;
10 use std::sync::{Arc, Mutex};
11 use std::time::Instant;
12 
13 use anyhow::Result;
14 use chrono::{DateTime, FixedOffset};
15 use futures::sink::SinkExt;
16 use futures::stream::StreamExt;
17 use rand::distr::Distribution;
18 use rand::Rng;
19 use secrecy::ExposeSecret;
20 use tokio::runtime::Runtime as TokioRuntime;
21 use tokio::sync::{mpsc, RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
22 use uuid::Uuid;
23 
24 use xmpp_parsers::caps::{self, Caps};
25 use xmpp_parsers::delay::Delay;
26 use xmpp_parsers::hashes as xmpp_hashes;
27 use xmpp_parsers::iq::{Iq, IqType};
28 use xmpp_parsers::legacy_omemo;
29 use xmpp_parsers::message::Message as XmppParsersMessage;
30 use xmpp_parsers::muc::Muc;
31 use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
32 use xmpp_parsers::pubsub::event::PubSubEvent;
33 use xmpp_parsers::stanza_error::StanzaError;
34 use xmpp_parsers::{iq, presence};
35 use jid::{BareJid, FullJid, Jid};
36 use minidom::Element;
37
38 use crate::account::{Account, ConnectionInfo, Password};
39 use crate::async_iq::{IqFuture, PendingIqState};
40 use crate::config::Config;
41 use crate::crypto::CryptoEngine;
42 use crate::mods;
43 use crate::message::Message;
44 
45 const VERSION: &str = env!("CARGO_PKG_VERSION");
46 
47 #[derive(Debug, Clone)]
48 pub enum Event {
49     Start,
50     Connect(ConnectionInfo, Password),
51     Connected(Account, Jid),
52     Disconnected(Account, String),
53     AuthError(Account, String),
54     Stanza(Account, Element),
55     RawMessage {
56         account: Account,
57         message: XmppParsersMessage,
58         delay: Option<Delay>,
59         archive: bool,
60     },
61     SendMessage(Account, Message),
62     Message(Option<Account>, Message),
63     Chat {
64         account: Account,
65         contact: BareJid,
66     },
67     Join {
68         account: FullJid,
69         channel: Jid,
70         user_request: bool,
71     },
72     Joined {
73         account: FullJid,
74         channel: FullJid,
75         user_request: bool,
76     },
77     Iq(Account, iq::Iq),
78     IqResult {
79         account: Account,
80         uuid: Uuid,
81         from: Option<Jid>,
82         payload: Option<Element>,
83     },
84     IqError {
85         account: Account,
86         uuid: Uuid,
87         from: Option<Jid>,
88         payload: StanzaError,
89     },
90     Disco(Account, Vec<String>),
91     PubSub {
92         account: Account,
93         from: Option<Jid>,
94         event: PubSubEvent,
95     },
96     Presence(Account, presence::Presence),
97     Win(String),
98     Close(String),
99     WindowChange,
100     LoadChannelHistory {
101         account: Account,
102         jid: BareJid,
103         from: Option<DateTime<FixedOffset>>,
104     },
105     LoadChatHistory {
106         account: Account,
107         contact: BareJid,
108         from: Option<DateTime<FixedOffset>>,
109     },
110     Quit,
111     ResetCompletion,
112     ChangeWindow(String),
113     Subject(Account, Jid, HashMap<String, String>),
114 }
115 
116 pub enum Mod {
117     Disco(mods::disco::DiscoMod),
118 }
119 
120 macro_rules! from_mod {
121     ($enum:ident, $type:path) => {
122         impl<'a> From<&'a Mod> for &'a $type {
123             fn from(r#mod: &'a Mod) -> &'a $type {
124                 match r#mod {
125                     Mod::$enum(r#mod) => r#mod,
126                     _ => unreachable!(),
127                 }
128             }
129         }
130 
131         impl<'a> From<&'a mut Mod> for &'a mut $type {
132             fn from(r#mod: &'a mut Mod) -> &'a mut $type {
133                 match r#mod {
134                     Mod::$enum(r#mod) => r#mod,
135                     _ => unreachable!(),
136                 }
137             }
138         }
139     };
140 }
141 
142 from_mod!(Disco, mods::disco::DiscoMod);
143 
144 pub trait ModTrait: Display {
145     fn init(&mut self, aparte: &mut Aparte) -> Result<(), ()>;
146     fn on_event(&mut self, aparte: &mut Aparte, event: &Event);
147     /// Return weither this message can be handled
148     /// 0 means no, 1 mean definitely yes
149     fn can_handle_xmpp_message(
150         &mut self,
151         _aparte: &mut Aparte,
152         _account: &Account,
153         _message: &XmppParsersMessage,
154         _delay: &Option<Delay>,
155     ) -> f64 {
156         0f64
157     }
158 
159     /// Handle message
160     fn handle_xmpp_message(
161         &mut self,
162         _aparte: &mut Aparte,
163         _account: &Account,
164         _message: &XmppParsersMessage,
165         _delay: &Option<Delay>,
166         _archive: bool,
167     ) {
168     }
169 }
170 
171 impl ModTrait for Mod {
172     fn init(&mut self, aparte: &mut Aparte) -> Result<(), ()> {
173         match self {
174             Mod::Disco(r#mod) => r#mod.init(aparte),
175         }
176     }
177 
178     fn on_event(&mut self, aparte: &mut Aparte, event: &Event) {
179         match self {
180             Mod::Disco(r#mod) => r#mod.on_event(aparte, event),
181         }
182     }
183 
184     fn can_handle_xmpp_message(
185         &mut self,
186         aparte: &mut Aparte,
187         account: &Account,
188         message: &XmppParsersMessage,
189         delay: &Option<Delay>,
190     ) -> f64 {
191         match self {
192             Mod::Disco(r#mod) => r#mod.can_handle_xmpp_message(aparte, account, message, delay),
193         }
194     }
195 
196     fn handle_xmpp_message(
197         &mut self,
198         aparte: &mut Aparte,
199         account: &Account,
200         message: &XmppParsersMessage,
201         delay: &Option<Delay>,
202         archive: bool,
203     ) {
204         match self {
205             Mod::Disco(r#mod) => {
206                 r#mod.handle_xmpp_message(aparte, account, message, delay, archive)
207             }
208         }
209     }
210 }
211 
212 impl fmt::Debug for Mod {
213     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
214         match self {
215             Mod::Disco(_) => f.write_str("Mod::Disco"),
216         }
217     }
218 }
219 
220 impl Display for Mod {
221     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222         match self {
223             Mod::Disco(r#mod) => r#mod.fmt(f),
224         }
225     }
226 }
227 
228 pub struct Connection {
229     pub sink: mpsc::UnboundedSender<Element>,
230 }
231 
232 #[macro_export]
233 macro_rules! info(
234     ($aparte:ident, $msg:literal, $($args: tt)*) => ({
235         ::log::info!($msg, $($args)*);
236         $aparte.log(format!($msg, $($args)*))
237     });
238     ($aparte:ident, $msg:literal) => ({
239         ::log::info!($msg);
240         $aparte.log(format!($msg))
241     });
242 );
243 
244 #[macro_export]
245 macro_rules! error(
246     ($aparte:ident, $err:ident, $msg:literal, $($args: tt)*) => ({
247         let context = format!($msg, $($args)*);
248         ::log::error!("{:?}", $err.context(context.clone()));
249         $aparte.log(context)
250     });
251     ($aparte:ident, $err:ident, $msg:literal) => ({
252         let context = format!($msg);
253         ::log::error!("{:?}", $err.context(context.clone()));
254         $aparte.log(context)
255     });
256 );
257 
258 pub struct Aparte {
259     mods: Arc<HashMap<TypeId, RwLock<Mod>>>,
260     connections: HashMap<Account, Connection>,
261     current_connection: Option<Account>,
262     event_tx: mpsc::UnboundedSender<Event>,
263     event_rx: Option<mpsc::UnboundedReceiver<Event>>,
264     send_tx: mpsc::UnboundedSender<(Account, Element)>,
265     send_rx: Option<mpsc::UnboundedReceiver<(Account, Element)>>,
266     pending_iq: Arc<Mutex<HashMap<Uuid, PendingIqState>>>,
267     crypto_engines: Arc<Mutex<HashMap<(Account, BareJid), CryptoEngine>>>,
268     /// Aparté main configuration
269     pub config: Config,
270 }
271 
272 impl Aparte {
273     pub fn new(config: Config) -> Result<Self> {
274         log::debug!("Loading aparté");
275 
276         let (event_tx, event_rx) = mpsc::unbounded_channel();
277         let (send_tx, send_rx) = mpsc::unbounded_channel();
278 
279         let aparte = Self {
280             mods: Arc::new(HashMap::new()),
281             connections: HashMap::new(),
282             current_connection: None,
283             event_tx,
284             event_rx: Some(event_rx),
285             send_tx,
286             send_rx: Some(send_rx),
287             config: config.clone(),
288             pending_iq: Arc::new(Mutex::new(HashMap::new())),
289             crypto_engines: Arc::new(Mutex::new(HashMap::new())),
290         };
291 
292         Ok(aparte)
293     }
294 
295     pub fn add_mod(&mut self, r#mod: Mod) {
296         log::info!("Add mod `{}`", r#mod);
297         let mods = Arc::get_mut(&mut self.mods).unwrap();
298         // TODO ensure mod is not inserted twice
299         match r#mod {
300             Mod::Disco(r#mod) => {
301                 mods.insert(
302                     TypeId::of::<mods::disco::DiscoMod>(),
303                     RwLock::new(Mod::Disco(r#mod)),
304                 );
305             }
306         }
307     }
308 
309     pub fn add_connection(&mut self, account: Account, sink: mpsc::UnboundedSender<Element>) {
310         let connection = Connection { sink };
311 
312         self.connections.insert(account.clone(), connection);
313         self.current_connection = Some(account);
314     }
315 
316     pub fn init(&mut self) -> Result<(), ()> { 
317         let mods = self.mods.clone();
318         for (_, r#mod) in mods.iter() {
319             r#mod.try_write().unwrap().init(self)?;
320         }
321 
322         Ok(())
323     }
324 
325     pub fn run(mut self) {
326         let rt = TokioRuntime::new().unwrap();
327 
328        //  rt.spawn({
329        //      let tx = self.event_tx.clone();
330        //      async move {
331        //          let mut sigwinch = unix::signal(unix::SignalKind::window_change()).unwrap();
332        //          loop {
333        //              sigwinch.recv().await;
334        //              if let Err(err) = tx.send(Event::WindowChange) {
335        //                  log::error!("Cannot send signal to internal channel: {}", err);
336        //                  break;
337        //              }
338        //          }
339        //      }
340        //  });
341 
342         let local_set = tokio::task::LocalSet::new();
343         local_set.block_on(&rt, async move {
344             self.schedule(Event::Start);
345             let mut event_rx = self.event_rx.take().unwrap();
346             let mut send_rx = self.send_rx.take().unwrap();
347 
348             let mut last_events = VecDeque::new();
349             'main: loop {
350                 let mut events_buf = Vec::new();
351                 tokio::select! {
352                     count = event_rx.recv_many(&mut events_buf, 1000) => match count {
353                         0 => {
354                             log::error!("Broken event channel");
355                             break
356                         }
357                         events_count => {
358                             // Ensure all key events are handled first
359                             let (priority_events, filtered_events): (VecDeque<_>, VecDeque<_>) = events_buf.drain(..).partition(|event| matches!(event,
360                                                                                                  | Event::ChangeWindow(_)
361                                                                                                  | Event::Quit));
362 
363                             log::trace!("Event loop got {} new events ({} priority); have {} last events", events_count, priority_events.len(), last_events.len());
364 
365                             last_events.extend(filtered_events);
366                             // Handle priority events first
367                             for event in priority_events {
368                                 if self.handle_event(event).is_err() {
369                                     break 'main
370                                 }
371                             }
372                             let mut start = Instant::now();
373                             while let Some(event) = last_events.pop_front() {
374                                 if self.handle_event(event).is_err() {
375                                     break 'main;
376                                 }
377                             }
378                             log::trace!("Event loop, delayed handling of {} events", last_events.len());
379                         },
380                     },
381                     account_and_stanza = send_rx.recv() => match account_and_stanza {
382                         Some((account, stanza)) => self.send_stanza(account, stanza),
383                         None => {
384                             log::error!("Broken send channel");
385                             break;
386                         }
387                     }
388                 };
389             }
390         });
391     }
392 
393    pub async fn run_async(&mut self) {
394       self.schedule(Event::Start);
395       let mut event_rx = self.event_rx.take().unwrap();
396       let mut send_rx = self.send_rx.take().unwrap();
397
398       let mut last_events = VecDeque::new();
399       loop {
400          let mut events_buf = Vec::new();
401          tokio::select! {
402             count = event_rx.recv_many(&mut events_buf, 1000) => match count {
403                0 => {
404                    log::error!("Broken event channel");
405                    break
406                }
407                events_count => {
408                    // Ensure all key events are handled first
409                    let (priority_events, filtered_events): (VecDeque<_>, VecDeque<_>) = events_buf.drain(..).partition(|event| matches!(event,
410                                                                          | Event::ChangeWindow(_)
411                                                                          | Event::Quit));
412
413                    log::trace!("Event loop got {} new events ({} priority); have {} last events", events_count, priority_events.len(), last_events.len());
414
415                    last_events.extend(filtered_events);
416                    // Handle priority events first
417                    for event in priority_events {
418                       if self.handle_event(event).is_err() {
419                          break
420                       }
421                    }
422                    while let Some(event) = last_events.pop_front() {
423                       if self.handle_event(event).is_err() {
424                          break;
425                       }
426                    }
427                    log::trace!("Event loop, delayed handling of {} events", last_events.len());
428                },
429             },
430             account_and_stanza = send_rx.recv() => match account_and_stanza {
431                Some((account, stanza)) => self.send_stanza(account, stanza),
432                None => {
433                    log::error!("Broken send channel");
434                    break;
435                }
436             }
437          };
438       }
439    }
440
441     pub fn start(&mut self) {
442         for (name, account) in self.config.accounts.clone() {
443             if account.autoconnect {
444                //self.schedule(Event::Connect(account, account.password.expect("password not set").clone()));
445                self.connect(&account, account.password.clone().expect("password not set"));
446             }
447         }
448     }
449 
450     fn send_stanza(&mut self, account: Account, stanza: Element) {
451         let mut raw = Vec::<u8>::new();
452         stanza.write_to(&mut raw).unwrap();
453         log::debug!("SEND: {}", String::from_utf8(raw).unwrap());
454         match self.connections.get_mut(&account) {
455             Some(connection) => {
456                 if let Err(e) = connection.sink.send(stanza) {
457                     log::warn!("Cannot send stanza: {}", e);
458                 }
459             }
460             None => {
461                 log::warn!("No connection found for {}", account);
462             }
463         }
464     }
465 
466     pub fn connect(&mut self, connection_info: &ConnectionInfo, password: Password) {
467         let account: Account = match Jid::from_str(&connection_info.jid).map(Jid::try_into_full) {
468             Ok(Ok(full_jid)) => full_jid,
469             Ok(Err(bare_jid)) => {
470                 let rand_string: String = rand::rng()
471                     .sample_iter(&rand::distr::Alphanumeric)
472                     .take(5)
473                     .map(char::from)
474                     .collect();
475                 bare_jid
476                     .with_resource_str(&format!("aparte_{rand_string}"))
477                     .unwrap()
478             }
479             Err(err) => {
480                 self.log(format!(
481                     "Cannot connect as {}: {}",
482                     connection_info.jid, err
483                 ));
484                 return;
485             }
486         };
487 
488         self.log(format!("Connecting as {account}"));
489         let config = tokio_xmpp::AsyncConfig {
490             jid: Jid::from(account.clone()),
491             password: password.expose_secret().to_string(),
492             server: match (&connection_info.server, &connection_info.port) {
493                 (Some(server), Some(port)) => tokio_xmpp::starttls::ServerConfig::Manual {
494                     host: server.clone(),
495                     port: *port,
496                 },
497                 (Some(server), None) => tokio_xmpp::starttls::ServerConfig::Manual {
498                     host: server.clone(),
499                     port: 5222,
500                 },
501                 (None, Some(port)) => tokio_xmpp::starttls::ServerConfig::Manual {
502                     host: account.domain().to_string(),
503                     port: *port,
504                 },
505                 (None, None) => tokio_xmpp::starttls::ServerConfig::UseSrv,
506             },
507         };
508         log::debug!("Connect with config: {config:?}");
509         let mut client = tokio_xmpp::AsyncClient::new_with_config(config);
510 
511         client.set_reconnect(true);
512 
513         let (connection_channel, mut rx) = mpsc::unbounded_channel();
514 
515         self.add_connection(account.clone(), connection_channel);
516 
517         let (mut writer, mut reader) = client.split();
518         // XXX could use self.rt.spawn if client was impl Send
519         tokio::spawn(async move {
520             while let Some(element) = rx.recv().await {
521                 if let Err(err) = writer.send(tokio_xmpp::Packet::Stanza(element)).await {
522                     log::error!("cannot send Stanza to internal channel: {}", err);
523                     break;
524                 }
525             }
526         });
527 
528         let event_tx = self.event_tx.clone();
529 
530         let reconnect = true;
531         tokio::spawn(async move {
532             while let Some(event) = reader.next().await {
533                 log::debug!("XMPP Event: {:?}", event);
534                 match event {
535                     tokio_xmpp::Event::Disconnected(tokio_xmpp::Error::Auth(e)) => {
536                         if let Err(err) =
537                             event_tx.send(Event::AuthError(account.clone(), format!("{e}")))
538                         {
539                             log::error!("Cannot send event to internal channel: {}", err);
540                         };
541                         break;
542                     }
543                     tokio_xmpp::Event::Disconnected(e) => {
544                         if let Err(err) =
545                             event_tx.send(Event::Disconnected(account.clone(), format!("{e}")))
546                         {
547                             log::error!("Cannot send event to internal channel: {}", err);
548                         };
549                         if !reconnect {
550                             break;
551                         }
552                     }
553                     tokio_xmpp::Event::Online {
554                         bound_jid: jid,
555                         resumed: true,
556                     } => {
557                         log::debug!("Reconnected to {}", jid);
558                     }
559                     tokio_xmpp::Event::Online {
560                         bound_jid: jid,
561                         resumed: false,
562                     } => {
563                         if let Err(err) = event_tx.send(Event::Connected(account.clone(), jid)) {
564                             log::error!("Cannot send event to internal channel: {}", err);
565                             break;
566                         }
567                     }
568                     tokio_xmpp::Event::Stanza(stanza) => {
569                         log::debug!("RECV: {}", String::from(&stanza));
570                         if let Err(err) = event_tx.send(Event::Stanza(account.clone(), stanza)) {
571                             log::error!("Cannot send stanza to internal channel: {}", err);
572                             break;
573                         }
574                     }
575                 }
576             }
577         });
578     }
579 
580     pub fn handle_event(&mut self, event: Event) -> Result<(), ()> {
581         log::trace!("Handle event: {:?}", event);
582
583         let before = Instant::now();
584 
585         {
586             let mods = self.mods.clone();
587             for (_, r#mod) in mods.iter() {
588                 let before = Instant::now();
589                 r#mod.try_write().unwrap().on_event(self, &event);
590                 log::trace!("{:?} handled event in {:.2?}", r#mod, before.elapsed());
591             }
592         }
593 
594         match event {
595             Event::Start => {
596                 self.start();
597             }
598             Event::SendMessage(account, message) => {
599                 self.schedule(Event::Message(Some(account.clone()), message.clone()));
600 
601                 // Encrypt if required
602                 let encryption = message.encryption_recipient().and_then(|recipient| {
603                     let mut crypto_engines = self.crypto_engines.lock().unwrap();
604                     crypto_engines
605                         .get_mut(&(account.clone(), recipient))
606                         .map(|crypto_engine| crypto_engine.encrypt(self, &account, &message))
607                 });
608 
609                 match encryption {
610                     Some(Ok(encrypted_message)) => self.send(&account, encrypted_message),
611                     Some(Err(e)) => {
612                         log::error!("Cannot encrypt message (TODO print error in UI): {e}")
613                     }
614                     None => self.send(&account, message),
615                 }
616             }
617             Event::Connect(account, password) => {
618                 self.connect(&account, password);
619             }
620             Event::Connected(account, _) => {
621                 self.log(format!("Connected as {}", account));
622                 let mut presence = Presence::new(PresenceType::None);
623                 presence.show = Some(PresenceShow::Chat);
624 
625                 let disco = self.get_mod::<mods::disco::DiscoMod>().get_disco();
626                 let disco = caps::compute_disco(&disco);
627                 let verification_string =
628                     caps::hash_caps(&disco, xmpp_hashes::Algo::Blake2b_512).unwrap();
629                 let caps = Caps::new("aparté", verification_string);
630                 presence.add_payload(caps);
631 
632                 self.send(&account, presence);
633             }
634             Event::Disconnected(account, err) => {
635                 self.log(format!("Connection lost for {}: {}", account, err));
636             }
637             Event::AuthError(account, err) => {
638                 self.log(format!("Authentication error for {}: {}", account, err));
639             }
640             Event::Stanza(account, stanza) => {
641                 self.handle_stanza(account, stanza);
642             }
643             Event::RawMessage {
644                 account,
645                 message,
646                 delay,
647                 archive,
648             } => {
649                 self.handle_xmpp_message(account, message, delay, archive);
650             }
651             Event::Join {
652                 account,
653                 channel,
654                 user_request,
655             } => {
656                 let to = match channel.try_as_full() {
657                     Ok(full_jid) => full_jid.clone(),
658                     Err(bare_jid) => bare_jid
659                         .with_resource_str(account.node().as_ref().unwrap())
660                         .unwrap(),
661                 };
662                 let from: Jid = account.clone().into();
663 
664                 let mut presence = Presence::new(PresenceType::None);
665                 presence = presence.with_to(Jid::from(to.clone()));
666                 presence = presence.with_from(from);
667                 presence.add_payload(Muc::new());
668                 self.send(&account, presence);
669 
670                 // Successful join
671                 self.log(format!("Joined {}", channel));
672                 self.schedule(Event::Joined {
673                     account: account.clone(),
674                     channel: to,
675                     user_request,
676                 });
677             }
678             Event::Quit => {
679                 return Err(());
680             }
681             _ => {}
682         }
683 
684         log::trace!("Fully handled event in {:.2?}", before.elapsed());
685         Ok(())
686     }
687 
688     fn handle_stanza(&mut self, account: Account, stanza: Element) {
689         match stanza.name() {
690             "iq" => match Iq::try_from(stanza.clone()) {
691                 Ok(iq) => self.handle_iq(account, iq),
692                 Err(err) => {
693                     log::error!("{}", err);
694                     if let Some(id) = stanza.attr("id") {
695                         self.errored_iq(id, err.into());
696                     }
697                 }
698             },
699             "presence" => match Presence::try_from(stanza) {
700                 Ok(presence) => self.schedule(Event::Presence(account, presence)),
701                 Err(err) => log::error!("{}", err),
702             },
703             "message" => match XmppParsersMessage::try_from(stanza) {
704                 Ok(message) => self.handle_xmpp_message(account, message, None, false),
705                 Err(err) => log::error!("{}", err),
706             },
707             _ => log::error!("unknown stanza: {}", stanza.name()),
708         }
709     }
710 
711     fn handle_xmpp_message(
712         &mut self,
713         account: Account,
714         message: XmppParsersMessage,
715         delay: Option<Delay>,
716         archive: bool,
717     ) {
718         let mut best_match = 0f64;
719         let mut matched_mod = None;
720         let mut message = message;
721 
722         let encryption_ns = message
723             .payloads
724             .iter()
725             .find_map(|p| {
726                 xmpp_parsers::eme::ExplicitMessageEncryption::try_from((*p).clone())
727                     .ok()
728                     .map(|eme| eme.namespace)
729             })
730             .or(message.payloads.iter().find_map(|p| {
731                 legacy_omemo::Encrypted::try_from((*p).clone())
732                     .ok()
733                     .map(|_| xmpp_parsers::ns::LEGACY_OMEMO.to_string())
734             }));
735 
736         // Decrypt if required
737         // TODO EME can't be required
738         if let (Some(encryption_ns), Some(from)) = (encryption_ns, message.from.clone()) {
739             let mut crypto_engines = self.crypto_engines.lock().unwrap();
740             if let Some(crypto_engine) = crypto_engines.get_mut(&(account.clone(), from.to_bare()))
741             {
742                 if encryption_ns == crypto_engine.ns() {
743                     message = match crypto_engine.decrypt(self, &account, &message) {
744                         Ok(message) => message,
745                         Err(err) => {
746                             log::error!(
747                                 "Cannot decrypt message with {}: {}",
748                                 crypto_engine.ns(),
749                                 err
750                             );
751                             message
752                         }
753                     };
754                 } else {
755                     log::warn!(
756                         "Incompatible crypto engine found for {:?} (found {} expecting {})",
757                         message.from,
758                         crypto_engine.ns(),
759                         encryption_ns
760                     );
761                 }
762             } else {
763                 log::warn!(
764                     "No crypto engine found for {:?} (encrypted with {})",
765                     message.from,
766                     encryption_ns
767                 );
768             }
769         }
770 
771         let mods = self.mods.clone();
772         for (_, r#mod) in mods.iter() {
773             let message_match = r#mod
774                 .try_write()
775                 .unwrap()
776                 .can_handle_xmpp_message(self, &account, &message, &delay);
777             if message_match > best_match {
778                 matched_mod = Some(r#mod);
779                 best_match = message_match;
780             }
781         }
782 
783         if let Some(r#mod) = matched_mod {
784             log::debug!("Handling xmpp message by {:?}", r#mod);
785             r#mod
786                 .try_write()
787                 .unwrap()
788                 .handle_xmpp_message(self, &account, &message, &delay, archive);
789         } else {
790             log::info!("Don't know how to handle message: {:?}", message);
791         }
792     }
793 
794     fn handle_iq(&mut self, account: Account, iq: Iq) {
795         // Try to match to pending Iq
796         if let Ok(uuid) = Uuid::from_str(&iq.id) {
797             let state = self.pending_iq.lock().unwrap().remove(&uuid);
798             if let Some(state) = state {
799                 match state {
800                     PendingIqState::Waiting(waker) => {
801                         // XXX dead lock
802                         self.pending_iq
803                             .lock()
804                             .unwrap()
805                             .insert(uuid, PendingIqState::Finished(iq));
806                         if let Some(waker) = waker {
807                             waker.wake();
808                         }
809                     }
810                     PendingIqState::Errored(_err) => {
811                         log::info!("Received multiple response for Iq: {}", uuid);
812                         // Insert valid iq instead
813                         self.pending_iq
814                             .lock()
815                             .unwrap()
816                             .insert(uuid, PendingIqState::Finished(iq));
817                     }
818                     PendingIqState::Finished(iq) => {
819                         log::info!("Received multiple response for Iq: {}", uuid);
820                         // Reinsert original result
821                         self.pending_iq
822                             .lock()
823                             .unwrap()
824                             .insert(uuid, PendingIqState::Finished(iq));
825                     }
826                 }
827                 return;
828             }
829         }
830 
831         // Unknown iq
832         match iq.payload {
833             IqType::Error(payload) => {
834                 if let Some(text) = payload.texts.get("en") {
835                     let message = Message::log(text.to_string());
836                     self.schedule(Event::Message(Some(account.clone()), message));
837                 }
838             }
839             IqType::Result(payload) => {
840                 log::info!("Received unexpected Iq result {:?}", payload);
841             }
842             _ => {
843                 self.schedule(Event::Iq(account, iq));
844             }
845         }
846     }
847 
848     fn errored_iq(&mut self, id: &str, err: anyhow::Error) {
849         if let Ok(uuid) = Uuid::from_str(id) {
850             let state = self.pending_iq.lock().unwrap().remove(&uuid);
851             if let Some(state) = state {
852                 match state {
853                     PendingIqState::Waiting(waker) => {
854                         // XXX dead lock
855                         self.pending_iq
856                             .lock()
857                             .unwrap()
858                             .insert(uuid, PendingIqState::Errored(err));
859                         if let Some(waker) = waker {
860                             waker.wake();
861                         }
862                     }
863                     PendingIqState::Errored(err) => {
864                         log::warn!("Received multiple response for Iq: {}", uuid);
865                         // Reinsert original result
866                         self.pending_iq
867                             .lock()
868                             .unwrap()
869                             .insert(uuid, PendingIqState::Errored(err));
870                     }
871                     PendingIqState::Finished(iq) => {
872                         log::warn!("Received multiple response for Iq: {}", uuid);
873                         // Reinsert original result
874                         self.pending_iq
875                             .lock()
876                             .unwrap()
877                             .insert(uuid, PendingIqState::Finished(iq));
878                     }
879                 }
880             }
881         }
882     }
883 
884     // TODO maybe use From<>
885     pub fn proxy(&self) -> AparteAsync {
886         AparteAsync {
887             current_connection: self.current_connection.clone(),
888             event_tx: self.event_tx.clone(),
889             send_tx: self.send_tx.clone(),
890             pending_iq: self.pending_iq.clone(),
891             config: self.config.clone(),
892             crypto_engines: self.crypto_engines.clone(),
893         }
894     }
895 
896     pub fn spawn<F>(future: F)
897     where
898         F: Future + Send + 'static,
899         F::Output: Send + 'static,
900     {
901         tokio::spawn(future);
902     }
903 
904     // Common function for AparteAsync and Aparte, maybe share it in Trait
905     pub fn add_crypto_engine(
906         &mut self,
907         account: &Account,
908         recipient: &BareJid,
909         crypto_engine: CryptoEngine,
910     ) {
911         let mut crypto_engines = self.crypto_engines.lock().unwrap();
912         crypto_engines.insert((account.clone(), recipient.clone()), crypto_engine);
913     }
914 
915     pub fn send<T>(&mut self, account: &Account, element: T)
916     where
917         T: TryInto<Element> + Debug,
918     {
919         match element.try_into() {
920             Ok(stanza) => self.send_tx.send((account.clone(), stanza)).unwrap(),
921             Err(_e) => {
922                 log::error!("Cannot convert to element");
923             }
924         };
925     }
926 
927     pub fn schedule(&mut self, event: Event) {
928         log::trace!("Schedule event {:?}", event);
929         self.event_tx.send(event).unwrap();
930     }
931 
932     pub fn log<T: ToString>(&mut self, message: T) {
933         let message = Message::log(message.to_string());
934         self.schedule(Event::Message(None, message));
935     }
936 
937     pub fn error<T: Display>(&mut self, message: T, err: anyhow::Error) {
938         let message = Message::log(format!("{}: {:#}", message, err));
939         self.schedule(Event::Message(None, message));
940     }
941 
942     pub fn get_mod<'a, T>(&'a self) -> RwLockReadGuard<'a, T>
943     where
944         T: 'static,
945         for<'b> &'b T: From<&'b Mod>,
946     {
947         match self.mods.get(&TypeId::of::<T>()) {
948             Some(r#mod) => RwLockReadGuard::map(r#mod.try_read().unwrap(), |m| m.into()),
949             None => unreachable!(),
950         }
951     }
952 
953     #[allow(unused)]
954     pub fn get_mod_mut<'a, T>(&'a self) -> RwLockMappedWriteGuard<'a, T>
955     where
956         T: 'static,
957         for<'b> &'b mut T: From<&'b mut Mod>,
958     {
959         match self.mods.get(&TypeId::of::<T>()) {
960             Some(r#mod) => RwLockWriteGuard::map(r#mod.try_write().unwrap(), |m| m.into()),
961             None => unreachable!(),
962         }
963     }
964 
965     pub fn current_account(&self) -> Option<Account> {
966         self.current_connection.clone()
967     }
968 }
969 
970 #[derive(Clone)]
971 pub struct AparteAsync {
972     current_connection: Option<Account>,
973     event_tx: mpsc::UnboundedSender<Event>,
974     send_tx: mpsc::UnboundedSender<(Account, Element)>,
975     crypto_engines: Arc<Mutex<HashMap<(Account, BareJid), CryptoEngine>>>,
976     pub(crate) pending_iq: Arc<Mutex<HashMap<Uuid, PendingIqState>>>,
977     pub config: Config,
978 }
979 
980 impl AparteAsync {
981    pub fn connect(&mut self, connection_info: &ConnectionInfo) {
982        let connection_info = connection_info.clone();
983        let password = connection_info.password.clone();
984        self.schedule(Event::Connect(connection_info, password.unwrap()));
985    }
986
987     pub fn send(&mut self, account: &Account, stanza: Element) {
988         self.send_tx.send((account.clone(), stanza)).unwrap();
989     }
990 
991     pub fn iq(&mut self, account: &Account, iq: Iq) -> IqFuture {
992         IqFuture::new(self.clone(), account, iq)
993     }
994 
995     pub fn schedule(&mut self, event: Event) {
996         self.event_tx.send(event).unwrap();
997     }
998 
999     pub fn log<T: ToString>(&mut self, message: T) {
1000         let message = Message::log(message.to_string());
1001         self.schedule(Event::Message(None, message));
1002     }
1003 
1004     pub fn error<T: Display>(&mut self, message: T, err: anyhow::Error) {
1005         let message = Message::log(format!("{}: {:#}", message, err));
1006         self.schedule(Event::Message(None, message));
1007     }
1008 
1009     pub fn current_account(&self) -> Option<Account> {
1010         self.current_connection.clone()
1011     }
1012 
1013     pub fn add_crypto_engine(
1014         &mut self,
1015         account: &Account,
1016         recipient: &BareJid,
1017         crypto_engine: CryptoEngine,
1018     ) {
1019         let mut crypto_engines = self.crypto_engines.lock().unwrap();
1020         crypto_engines.insert((account.clone(), recipient.clone()), crypto_engine);
1021     }
1022 }