1use std::any::TypeId;
5 use std::collections::{HashMap, VecDeque};
6 use std::convert::{TryFrom, TryInto};
7 use std::fmt::{self, Debug, Display};
8 use std::future::Future;
9 use std::str::FromStr;
10 use std::sync::{Arc, Mutex};
11 use std::time::Instant;
12
13 use anyhow::Result;
14 use chrono::{DateTime, FixedOffset};
15 use futures::sink::SinkExt;
16 use futures::stream::StreamExt;
17 use rand::distr::Distribution;
18 use rand::Rng;
19 use secrecy::ExposeSecret;
20 use tokio::runtime::Runtime as TokioRuntime;
21 use tokio::sync::{mpsc, RwLock, RwLockMappedWriteGuard, RwLockReadGuard, RwLockWriteGuard};
22 use uuid::Uuid;
23
24 use xmpp_parsers::caps::{self, Caps};
25 use xmpp_parsers::delay::Delay;
26 use xmpp_parsers::hashes as xmpp_hashes;
27 use xmpp_parsers::iq::{Iq, IqType};
28 use xmpp_parsers::legacy_omemo;
29 use xmpp_parsers::message::Message as XmppParsersMessage;
30 use xmpp_parsers::muc::Muc;
31 use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
32 use xmpp_parsers::pubsub::event::PubSubEvent;
33 use xmpp_parsers::stanza_error::StanzaError;
34 use xmpp_parsers::{iq, presence};
35 use jid::{BareJid, FullJid, Jid};
36 use minidom::Element;
37
38 use crate::account::{Account, ConnectionInfo, Password};
39 use crate::async_iq::{IqFuture, PendingIqState};
40 use crate::config::Config;
41 use crate::crypto::CryptoEngine;
42 use crate::mods;
43 use crate::message::Message;
44
45 const VERSION: &str = env!("CARGO_PKG_VERSION");
46
47 #[derive(Debug, Clone)]
48 pub enum Event {
49 Start,
50 Connect(ConnectionInfo, Password),
51 Connected(Account, Jid),
52 Disconnected(Account, String),
53 AuthError(Account, String),
54 Stanza(Account, Element),
55 RawMessage {
56 account: Account,
57 message: XmppParsersMessage,
58 delay: Option<Delay>,
59 archive: bool,
60 },
61 SendMessage(Account, Message),
62 Message(Option<Account>, Message),
63 Chat {
64 account: Account,
65 contact: BareJid,
66 },
67 Join {
68 account: FullJid,
69 channel: Jid,
70 user_request: bool,
71 },
72 Joined {
73 account: FullJid,
74 channel: FullJid,
75 user_request: bool,
76 },
77 Iq(Account, iq::Iq),
78 IqResult {
79 account: Account,
80 uuid: Uuid,
81 from: Option<Jid>,
82 payload: Option<Element>,
83 },
84 IqError {
85 account: Account,
86 uuid: Uuid,
87 from: Option<Jid>,
88 payload: StanzaError,
89 },
90 Disco(Account, Vec<String>),
91 PubSub {
92 account: Account,
93 from: Option<Jid>,
94 event: PubSubEvent,
95 },
96 Presence(Account, presence::Presence),
97 Win(String),
98 Close(String),
99 WindowChange,
100 LoadChannelHistory {
101 account: Account,
102 jid: BareJid,
103 from: Option<DateTime<FixedOffset>>,
104 },
105 LoadChatHistory {
106 account: Account,
107 contact: BareJid,
108 from: Option<DateTime<FixedOffset>>,
109 },
110 Quit,
111 ResetCompletion,
112 ChangeWindow(String),
113 Subject(Account, Jid, HashMap<String, String>),
114 }
115
116 pub enum Mod {
117 Disco(mods::disco::DiscoMod),
118 }
119
120 macro_rules! from_mod {
121 ($enum:ident, $type:path) => {
122 impl<'a> From<&'a Mod> for &'a $type {
123 fn from(r#mod: &'a Mod) -> &'a $type {
124 match r#mod {
125 Mod::$enum(r#mod) => r#mod,
126 _ => unreachable!(),
127 }
128 }
129 }
130
131 impl<'a> From<&'a mut Mod> for &'a mut $type {
132 fn from(r#mod: &'a mut Mod) -> &'a mut $type {
133 match r#mod {
134 Mod::$enum(r#mod) => r#mod,
135 _ => unreachable!(),
136 }
137 }
138 }
139 };
140 }
141
142 from_mod!(Disco, mods::disco::DiscoMod);
143
144 pub trait ModTrait: Display {
145 fn init(&mut self, aparte: &mut Aparte) -> Result<(), ()>;
146 fn on_event(&mut self, aparte: &mut Aparte, event: &Event);
147 fn can_handle_xmpp_message(
150 &mut self,
151 _aparte: &mut Aparte,
152 _account: &Account,
153 _message: &XmppParsersMessage,
154 _delay: &Option<Delay>,
155 ) -> f64 {
156 0f64
157 }
158
159 fn handle_xmpp_message(
161 &mut self,
162 _aparte: &mut Aparte,
163 _account: &Account,
164 _message: &XmppParsersMessage,
165 _delay: &Option<Delay>,
166 _archive: bool,
167 ) {
168 }
169 }
170
171 impl ModTrait for Mod {
172 fn init(&mut self, aparte: &mut Aparte) -> Result<(), ()> {
173 match self {
174 Mod::Disco(r#mod) => r#mod.init(aparte),
175 }
176 }
177
178 fn on_event(&mut self, aparte: &mut Aparte, event: &Event) {
179 match self {
180 Mod::Disco(r#mod) => r#mod.on_event(aparte, event),
181 }
182 }
183
184 fn can_handle_xmpp_message(
185 &mut self,
186 aparte: &mut Aparte,
187 account: &Account,
188 message: &XmppParsersMessage,
189 delay: &Option<Delay>,
190 ) -> f64 {
191 match self {
192 Mod::Disco(r#mod) => r#mod.can_handle_xmpp_message(aparte, account, message, delay),
193 }
194 }
195
196 fn handle_xmpp_message(
197 &mut self,
198 aparte: &mut Aparte,
199 account: &Account,
200 message: &XmppParsersMessage,
201 delay: &Option<Delay>,
202 archive: bool,
203 ) {
204 match self {
205 Mod::Disco(r#mod) => {
206 r#mod.handle_xmpp_message(aparte, account, message, delay, archive)
207 }
208 }
209 }
210 }
211
212 impl fmt::Debug for Mod {
213 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
214 match self {
215 Mod::Disco(_) => f.write_str("Mod::Disco"),
216 }
217 }
218 }
219
220 impl Display for Mod {
221 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
222 match self {
223 Mod::Disco(r#mod) => r#mod.fmt(f),
224 }
225 }
226 }
227
228 pub struct Connection {
229 pub sink: mpsc::UnboundedSender<Element>,
230 }
231
232 #[macro_export]
233 macro_rules! info(
234 ($aparte:ident, $msg:literal, $($args: tt)*) => ({
235 ::log::info!($msg, $($args)*);
236 $aparte.log(format!($msg, $($args)*))
237 });
238 ($aparte:ident, $msg:literal) => ({
239 ::log::info!($msg);
240 $aparte.log(format!($msg))
241 });
242 );
243
244 #[macro_export]
245 macro_rules! error(
246 ($aparte:ident, $err:ident, $msg:literal, $($args: tt)*) => ({
247 let context = format!($msg, $($args)*);
248 ::log::error!("{:?}", $err.context(context.clone()));
249 $aparte.log(context)
250 });
251 ($aparte:ident, $err:ident, $msg:literal) => ({
252 let context = format!($msg);
253 ::log::error!("{:?}", $err.context(context.clone()));
254 $aparte.log(context)
255 });
256 );
257
258 pub struct Aparte {
259 mods: Arc<HashMap<TypeId, RwLock<Mod>>>,
260 connections: HashMap<Account, Connection>,
261 current_connection: Option<Account>,
262 event_tx: mpsc::UnboundedSender<Event>,
263 event_rx: Option<mpsc::UnboundedReceiver<Event>>,
264 send_tx: mpsc::UnboundedSender<(Account, Element)>,
265 send_rx: Option<mpsc::UnboundedReceiver<(Account, Element)>>,
266 pending_iq: Arc<Mutex<HashMap<Uuid, PendingIqState>>>,
267 crypto_engines: Arc<Mutex<HashMap<(Account, BareJid), CryptoEngine>>>,
268 pub config: Config,
270 }
271
272 impl Aparte {
273 pub fn new(config: Config) -> Result<Self> {
274 log::debug!("Loading aparté");
275
276 let (event_tx, event_rx) = mpsc::unbounded_channel();
277 let (send_tx, send_rx) = mpsc::unbounded_channel();
278
279 let aparte = Self {
280 mods: Arc::new(HashMap::new()),
281 connections: HashMap::new(),
282 current_connection: None,
283 event_tx,
284 event_rx: Some(event_rx),
285 send_tx,
286 send_rx: Some(send_rx),
287 config: config.clone(),
288 pending_iq: Arc::new(Mutex::new(HashMap::new())),
289 crypto_engines: Arc::new(Mutex::new(HashMap::new())),
290 };
291
292 Ok(aparte)
293 }
294
295 pub fn add_mod(&mut self, r#mod: Mod) {
296 log::info!("Add mod `{}`", r#mod);
297 let mods = Arc::get_mut(&mut self.mods).unwrap();
298 match r#mod {
300 Mod::Disco(r#mod) => {
301 mods.insert(
302 TypeId::of::<mods::disco::DiscoMod>(),
303 RwLock::new(Mod::Disco(r#mod)),
304 );
305 }
306 }
307 }
308
309 pub fn add_connection(&mut self, account: Account, sink: mpsc::UnboundedSender<Element>) {
310 let connection = Connection { sink };
311
312 self.connections.insert(account.clone(), connection);
313 self.current_connection = Some(account);
314 }
315
316 pub fn init(&mut self) -> Result<(), ()> {
317 let mods = self.mods.clone();
318 for (_, r#mod) in mods.iter() {
319 r#mod.try_write().unwrap().init(self)?;
320 }
321
322 Ok(())
323 }
324
325 pub fn run(mut self) {
326 let rt = TokioRuntime::new().unwrap();
327
328 let local_set = tokio::task::LocalSet::new();
343 local_set.block_on(&rt, async move {
344 self.schedule(Event::Start);
345 let mut event_rx = self.event_rx.take().unwrap();
346 let mut send_rx = self.send_rx.take().unwrap();
347
348 let mut last_events = VecDeque::new();
349 'main: loop {
350 let mut events_buf = Vec::new();
351 tokio::select! {
352 count = event_rx.recv_many(&mut events_buf, 1000) => match count {
353 0 => {
354 log::error!("Broken event channel");
355 break
356 }
357 events_count => {
358 let (priority_events, filtered_events): (VecDeque<_>, VecDeque<_>) = events_buf.drain(..).partition(|event| matches!(event,
360 | Event::ChangeWindow(_)
361 | Event::Quit));
362
363 log::trace!("Event loop got {} new events ({} priority); have {} last events", events_count, priority_events.len(), last_events.len());
364
365 last_events.extend(filtered_events);
366 for event in priority_events {
368 if self.handle_event(event).is_err() {
369 break 'main
370 }
371 }
372 let mut start = Instant::now();
373 while let Some(event) = last_events.pop_front() {
374 if self.handle_event(event).is_err() {
375 break 'main;
376 }
377 }
378 log::trace!("Event loop, delayed handling of {} events", last_events.len());
379 },
380 },
381 account_and_stanza = send_rx.recv() => match account_and_stanza {
382 Some((account, stanza)) => self.send_stanza(account, stanza),
383 None => {
384 log::error!("Broken send channel");
385 break;
386 }
387 }
388 };
389 }
390 });
391 }
392
393 pub async fn run_async(&mut self) {
394 self.schedule(Event::Start);
395 let mut event_rx = self.event_rx.take().unwrap();
396 let mut send_rx = self.send_rx.take().unwrap();
397
398 let mut last_events = VecDeque::new();
399 loop {
400 let mut events_buf = Vec::new();
401 tokio::select! {
402 count = event_rx.recv_many(&mut events_buf, 1000) => match count {
403 0 => {
404 log::error!("Broken event channel");
405 break
406 }
407 events_count => {
408 let (priority_events, filtered_events): (VecDeque<_>, VecDeque<_>) = events_buf.drain(..).partition(|event| matches!(event,
410 | Event::ChangeWindow(_)
411 | Event::Quit));
412
413 log::trace!("Event loop got {} new events ({} priority); have {} last events", events_count, priority_events.len(), last_events.len());
414
415 last_events.extend(filtered_events);
416 for event in priority_events {
418 if self.handle_event(event).is_err() {
419 break
420 }
421 }
422 while let Some(event) = last_events.pop_front() {
423 if self.handle_event(event).is_err() {
424 break;
425 }
426 }
427 log::trace!("Event loop, delayed handling of {} events", last_events.len());
428 },
429 },
430 account_and_stanza = send_rx.recv() => match account_and_stanza {
431 Some((account, stanza)) => self.send_stanza(account, stanza),
432 None => {
433 log::error!("Broken send channel");
434 break;
435 }
436 }
437 };
438 }
439 }
440
441 pub fn start(&mut self) {
442 for (name, account) in self.config.accounts.clone() {
443 if account.autoconnect {
444 self.connect(&account, account.password.clone().expect("password not set"));
446 }
447 }
448 }
449
450 fn send_stanza(&mut self, account: Account, stanza: Element) {
451 let mut raw = Vec::<u8>::new();
452 stanza.write_to(&mut raw).unwrap();
453 log::debug!("SEND: {}", String::from_utf8(raw).unwrap());
454 match self.connections.get_mut(&account) {
455 Some(connection) => {
456 if let Err(e) = connection.sink.send(stanza) {
457 log::warn!("Cannot send stanza: {}", e);
458 }
459 }
460 None => {
461 log::warn!("No connection found for {}", account);
462 }
463 }
464 }
465
466 pub fn connect(&mut self, connection_info: &ConnectionInfo, password: Password) {
467 let account: Account = match Jid::from_str(&connection_info.jid).map(Jid::try_into_full) {
468 Ok(Ok(full_jid)) => full_jid,
469 Ok(Err(bare_jid)) => {
470 let rand_string: String = rand::rng()
471 .sample_iter(&rand::distr::Alphanumeric)
472 .take(5)
473 .map(char::from)
474 .collect();
475 bare_jid
476 .with_resource_str(&format!("aparte_{rand_string}"))
477 .unwrap()
478 }
479 Err(err) => {
480 self.log(format!(
481 "Cannot connect as {}: {}",
482 connection_info.jid, err
483 ));
484 return;
485 }
486 };
487
488 self.log(format!("Connecting as {account}"));
489 let config = tokio_xmpp::AsyncConfig {
490 jid: Jid::from(account.clone()),
491 password: password.expose_secret().to_string(),
492 server: match (&connection_info.server, &connection_info.port) {
493 (Some(server), Some(port)) => tokio_xmpp::starttls::ServerConfig::Manual {
494 host: server.clone(),
495 port: *port,
496 },
497 (Some(server), None) => tokio_xmpp::starttls::ServerConfig::Manual {
498 host: server.clone(),
499 port: 5222,
500 },
501 (None, Some(port)) => tokio_xmpp::starttls::ServerConfig::Manual {
502 host: account.domain().to_string(),
503 port: *port,
504 },
505 (None, None) => tokio_xmpp::starttls::ServerConfig::UseSrv,
506 },
507 };
508 log::debug!("Connect with config: {config:?}");
509 let mut client = tokio_xmpp::AsyncClient::new_with_config(config);
510
511 client.set_reconnect(true);
512
513 let (connection_channel, mut rx) = mpsc::unbounded_channel();
514
515 self.add_connection(account.clone(), connection_channel);
516
517 let (mut writer, mut reader) = client.split();
518 tokio::spawn(async move {
520 while let Some(element) = rx.recv().await {
521 if let Err(err) = writer.send(tokio_xmpp::Packet::Stanza(element)).await {
522 log::error!("cannot send Stanza to internal channel: {}", err);
523 break;
524 }
525 }
526 });
527
528 let event_tx = self.event_tx.clone();
529
530 let reconnect = true;
531 tokio::spawn(async move {
532 while let Some(event) = reader.next().await {
533 log::debug!("XMPP Event: {:?}", event);
534 match event {
535 tokio_xmpp::Event::Disconnected(tokio_xmpp::Error::Auth(e)) => {
536 if let Err(err) =
537 event_tx.send(Event::AuthError(account.clone(), format!("{e}")))
538 {
539 log::error!("Cannot send event to internal channel: {}", err);
540 };
541 break;
542 }
543 tokio_xmpp::Event::Disconnected(e) => {
544 if let Err(err) =
545 event_tx.send(Event::Disconnected(account.clone(), format!("{e}")))
546 {
547 log::error!("Cannot send event to internal channel: {}", err);
548 };
549 if !reconnect {
550 break;
551 }
552 }
553 tokio_xmpp::Event::Online {
554 bound_jid: jid,
555 resumed: true,
556 } => {
557 log::debug!("Reconnected to {}", jid);
558 }
559 tokio_xmpp::Event::Online {
560 bound_jid: jid,
561 resumed: false,
562 } => {
563 if let Err(err) = event_tx.send(Event::Connected(account.clone(), jid)) {
564 log::error!("Cannot send event to internal channel: {}", err);
565 break;
566 }
567 }
568 tokio_xmpp::Event::Stanza(stanza) => {
569 log::debug!("RECV: {}", String::from(&stanza));
570 if let Err(err) = event_tx.send(Event::Stanza(account.clone(), stanza)) {
571 log::error!("Cannot send stanza to internal channel: {}", err);
572 break;
573 }
574 }
575 }
576 }
577 });
578 }
579
580 pub fn handle_event(&mut self, event: Event) -> Result<(), ()> {
581 log::trace!("Handle event: {:?}", event);
582
583 let before = Instant::now();
584
585 {
586 let mods = self.mods.clone();
587 for (_, r#mod) in mods.iter() {
588 let before = Instant::now();
589 r#mod.try_write().unwrap().on_event(self, &event);
590 log::trace!("{:?} handled event in {:.2?}", r#mod, before.elapsed());
591 }
592 }
593
594 match event {
595 Event::Start => {
596 self.start();
597 }
598 Event::SendMessage(account, message) => {
599 self.schedule(Event::Message(Some(account.clone()), message.clone()));
600
601 let encryption = message.encryption_recipient().and_then(|recipient| {
603 let mut crypto_engines = self.crypto_engines.lock().unwrap();
604 crypto_engines
605 .get_mut(&(account.clone(), recipient))
606 .map(|crypto_engine| crypto_engine.encrypt(self, &account, &message))
607 });
608
609 match encryption {
610 Some(Ok(encrypted_message)) => self.send(&account, encrypted_message),
611 Some(Err(e)) => {
612 log::error!("Cannot encrypt message (TODO print error in UI): {e}")
613 }
614 None => self.send(&account, message),
615 }
616 }
617 Event::Connect(account, password) => {
618 self.connect(&account, password);
619 }
620 Event::Connected(account, _) => {
621 self.log(format!("Connected as {}", account));
622 let mut presence = Presence::new(PresenceType::None);
623 presence.show = Some(PresenceShow::Chat);
624
625 let disco = self.get_mod::<mods::disco::DiscoMod>().get_disco();
626 let disco = caps::compute_disco(&disco);
627 let verification_string =
628 caps::hash_caps(&disco, xmpp_hashes::Algo::Blake2b_512).unwrap();
629 let caps = Caps::new("aparté", verification_string);
630 presence.add_payload(caps);
631
632 self.send(&account, presence);
633 }
634 Event::Disconnected(account, err) => {
635 self.log(format!("Connection lost for {}: {}", account, err));
636 }
637 Event::AuthError(account, err) => {
638 self.log(format!("Authentication error for {}: {}", account, err));
639 }
640 Event::Stanza(account, stanza) => {
641 self.handle_stanza(account, stanza);
642 }
643 Event::RawMessage {
644 account,
645 message,
646 delay,
647 archive,
648 } => {
649 self.handle_xmpp_message(account, message, delay, archive);
650 }
651 Event::Join {
652 account,
653 channel,
654 user_request,
655 } => {
656 let to = match channel.try_as_full() {
657 Ok(full_jid) => full_jid.clone(),
658 Err(bare_jid) => bare_jid
659 .with_resource_str(account.node().as_ref().unwrap())
660 .unwrap(),
661 };
662 let from: Jid = account.clone().into();
663
664 let mut presence = Presence::new(PresenceType::None);
665 presence = presence.with_to(Jid::from(to.clone()));
666 presence = presence.with_from(from);
667 presence.add_payload(Muc::new());
668 self.send(&account, presence);
669
670 self.log(format!("Joined {}", channel));
672 self.schedule(Event::Joined {
673 account: account.clone(),
674 channel: to,
675 user_request,
676 });
677 }
678 Event::Quit => {
679 return Err(());
680 }
681 _ => {}
682 }
683
684 log::trace!("Fully handled event in {:.2?}", before.elapsed());
685 Ok(())
686 }
687
688 fn handle_stanza(&mut self, account: Account, stanza: Element) {
689 match stanza.name() {
690 "iq" => match Iq::try_from(stanza.clone()) {
691 Ok(iq) => self.handle_iq(account, iq),
692 Err(err) => {
693 log::error!("{}", err);
694 if let Some(id) = stanza.attr("id") {
695 self.errored_iq(id, err.into());
696 }
697 }
698 },
699 "presence" => match Presence::try_from(stanza) {
700 Ok(presence) => self.schedule(Event::Presence(account, presence)),
701 Err(err) => log::error!("{}", err),
702 },
703 "message" => match XmppParsersMessage::try_from(stanza) {
704 Ok(message) => self.handle_xmpp_message(account, message, None, false),
705 Err(err) => log::error!("{}", err),
706 },
707 _ => log::error!("unknown stanza: {}", stanza.name()),
708 }
709 }
710
711 fn handle_xmpp_message(
712 &mut self,
713 account: Account,
714 message: XmppParsersMessage,
715 delay: Option<Delay>,
716 archive: bool,
717 ) {
718 let mut best_match = 0f64;
719 let mut matched_mod = None;
720 let mut message = message;
721
722 let encryption_ns = message
723 .payloads
724 .iter()
725 .find_map(|p| {
726 xmpp_parsers::eme::ExplicitMessageEncryption::try_from((*p).clone())
727 .ok()
728 .map(|eme| eme.namespace)
729 })
730 .or(message.payloads.iter().find_map(|p| {
731 legacy_omemo::Encrypted::try_from((*p).clone())
732 .ok()
733 .map(|_| xmpp_parsers::ns::LEGACY_OMEMO.to_string())
734 }));
735
736 if let (Some(encryption_ns), Some(from)) = (encryption_ns, message.from.clone()) {
739 let mut crypto_engines = self.crypto_engines.lock().unwrap();
740 if let Some(crypto_engine) = crypto_engines.get_mut(&(account.clone(), from.to_bare()))
741 {
742 if encryption_ns == crypto_engine.ns() {
743 message = match crypto_engine.decrypt(self, &account, &message) {
744 Ok(message) => message,
745 Err(err) => {
746 log::error!(
747 "Cannot decrypt message with {}: {}",
748 crypto_engine.ns(),
749 err
750 );
751 message
752 }
753 };
754 } else {
755 log::warn!(
756 "Incompatible crypto engine found for {:?} (found {} expecting {})",
757 message.from,
758 crypto_engine.ns(),
759 encryption_ns
760 );
761 }
762 } else {
763 log::warn!(
764 "No crypto engine found for {:?} (encrypted with {})",
765 message.from,
766 encryption_ns
767 );
768 }
769 }
770
771 let mods = self.mods.clone();
772 for (_, r#mod) in mods.iter() {
773 let message_match = r#mod
774 .try_write()
775 .unwrap()
776 .can_handle_xmpp_message(self, &account, &message, &delay);
777 if message_match > best_match {
778 matched_mod = Some(r#mod);
779 best_match = message_match;
780 }
781 }
782
783 if let Some(r#mod) = matched_mod {
784 log::debug!("Handling xmpp message by {:?}", r#mod);
785 r#mod
786 .try_write()
787 .unwrap()
788 .handle_xmpp_message(self, &account, &message, &delay, archive);
789 } else {
790 log::info!("Don't know how to handle message: {:?}", message);
791 }
792 }
793
794 fn handle_iq(&mut self, account: Account, iq: Iq) {
795 if let Ok(uuid) = Uuid::from_str(&iq.id) {
797 let state = self.pending_iq.lock().unwrap().remove(&uuid);
798 if let Some(state) = state {
799 match state {
800 PendingIqState::Waiting(waker) => {
801 self.pending_iq
803 .lock()
804 .unwrap()
805 .insert(uuid, PendingIqState::Finished(iq));
806 if let Some(waker) = waker {
807 waker.wake();
808 }
809 }
810 PendingIqState::Errored(_err) => {
811 log::info!("Received multiple response for Iq: {}", uuid);
812 self.pending_iq
814 .lock()
815 .unwrap()
816 .insert(uuid, PendingIqState::Finished(iq));
817 }
818 PendingIqState::Finished(iq) => {
819 log::info!("Received multiple response for Iq: {}", uuid);
820 self.pending_iq
822 .lock()
823 .unwrap()
824 .insert(uuid, PendingIqState::Finished(iq));
825 }
826 }
827 return;
828 }
829 }
830
831 match iq.payload {
833 IqType::Error(payload) => {
834 if let Some(text) = payload.texts.get("en") {
835 let message = Message::log(text.to_string());
836 self.schedule(Event::Message(Some(account.clone()), message));
837 }
838 }
839 IqType::Result(payload) => {
840 log::info!("Received unexpected Iq result {:?}", payload);
841 }
842 _ => {
843 self.schedule(Event::Iq(account, iq));
844 }
845 }
846 }
847
848 fn errored_iq(&mut self, id: &str, err: anyhow::Error) {
849 if let Ok(uuid) = Uuid::from_str(id) {
850 let state = self.pending_iq.lock().unwrap().remove(&uuid);
851 if let Some(state) = state {
852 match state {
853 PendingIqState::Waiting(waker) => {
854 self.pending_iq
856 .lock()
857 .unwrap()
858 .insert(uuid, PendingIqState::Errored(err));
859 if let Some(waker) = waker {
860 waker.wake();
861 }
862 }
863 PendingIqState::Errored(err) => {
864 log::warn!("Received multiple response for Iq: {}", uuid);
865 self.pending_iq
867 .lock()
868 .unwrap()
869 .insert(uuid, PendingIqState::Errored(err));
870 }
871 PendingIqState::Finished(iq) => {
872 log::warn!("Received multiple response for Iq: {}", uuid);
873 self.pending_iq
875 .lock()
876 .unwrap()
877 .insert(uuid, PendingIqState::Finished(iq));
878 }
879 }
880 }
881 }
882 }
883
884 pub fn proxy(&self) -> AparteAsync {
886 AparteAsync {
887 current_connection: self.current_connection.clone(),
888 event_tx: self.event_tx.clone(),
889 send_tx: self.send_tx.clone(),
890 pending_iq: self.pending_iq.clone(),
891 config: self.config.clone(),
892 crypto_engines: self.crypto_engines.clone(),
893 }
894 }
895
896 pub fn spawn<F>(future: F)
897 where
898 F: Future + Send + 'static,
899 F::Output: Send + 'static,
900 {
901 tokio::spawn(future);
902 }
903
904 pub fn add_crypto_engine(
906 &mut self,
907 account: &Account,
908 recipient: &BareJid,
909 crypto_engine: CryptoEngine,
910 ) {
911 let mut crypto_engines = self.crypto_engines.lock().unwrap();
912 crypto_engines.insert((account.clone(), recipient.clone()), crypto_engine);
913 }
914
915 pub fn send<T>(&mut self, account: &Account, element: T)
916 where
917 T: TryInto<Element> + Debug,
918 {
919 match element.try_into() {
920 Ok(stanza) => self.send_tx.send((account.clone(), stanza)).unwrap(),
921 Err(_e) => {
922 log::error!("Cannot convert to element");
923 }
924 };
925 }
926
927 pub fn schedule(&mut self, event: Event) {
928 log::trace!("Schedule event {:?}", event);
929 self.event_tx.send(event).unwrap();
930 }
931
932 pub fn log<T: ToString>(&mut self, message: T) {
933 let message = Message::log(message.to_string());
934 self.schedule(Event::Message(None, message));
935 }
936
937 pub fn error<T: Display>(&mut self, message: T, err: anyhow::Error) {
938 let message = Message::log(format!("{}: {:#}", message, err));
939 self.schedule(Event::Message(None, message));
940 }
941
942 pub fn get_mod<'a, T>(&'a self) -> RwLockReadGuard<'a, T>
943 where
944 T: 'static,
945 for<'b> &'b T: From<&'b Mod>,
946 {
947 match self.mods.get(&TypeId::of::<T>()) {
948 Some(r#mod) => RwLockReadGuard::map(r#mod.try_read().unwrap(), |m| m.into()),
949 None => unreachable!(),
950 }
951 }
952
953 #[allow(unused)]
954 pub fn get_mod_mut<'a, T>(&'a self) -> RwLockMappedWriteGuard<'a, T>
955 where
956 T: 'static,
957 for<'b> &'b mut T: From<&'b mut Mod>,
958 {
959 match self.mods.get(&TypeId::of::<T>()) {
960 Some(r#mod) => RwLockWriteGuard::map(r#mod.try_write().unwrap(), |m| m.into()),
961 None => unreachable!(),
962 }
963 }
964
965 pub fn current_account(&self) -> Option<Account> {
966 self.current_connection.clone()
967 }
968 }
969
970 #[derive(Clone)]
971 pub struct AparteAsync {
972 current_connection: Option<Account>,
973 event_tx: mpsc::UnboundedSender<Event>,
974 send_tx: mpsc::UnboundedSender<(Account, Element)>,
975 crypto_engines: Arc<Mutex<HashMap<(Account, BareJid), CryptoEngine>>>,
976 pub(crate) pending_iq: Arc<Mutex<HashMap<Uuid, PendingIqState>>>,
977 pub config: Config,
978 }
979
980 impl AparteAsync {
981 pub fn connect(&mut self, connection_info: &ConnectionInfo) {
982 let connection_info = connection_info.clone();
983 let password = connection_info.password.clone();
984 self.schedule(Event::Connect(connection_info, password.unwrap()));
985 }
986
987 pub fn send(&mut self, account: &Account, stanza: Element) {
988 self.send_tx.send((account.clone(), stanza)).unwrap();
989 }
990
991 pub fn iq(&mut self, account: &Account, iq: Iq) -> IqFuture {
992 IqFuture::new(self.clone(), account, iq)
993 }
994
995 pub fn schedule(&mut self, event: Event) {
996 self.event_tx.send(event).unwrap();
997 }
998
999 pub fn log<T: ToString>(&mut self, message: T) {
1000 let message = Message::log(message.to_string());
1001 self.schedule(Event::Message(None, message));
1002 }
1003
1004 pub fn error<T: Display>(&mut self, message: T, err: anyhow::Error) {
1005 let message = Message::log(format!("{}: {:#}", message, err));
1006 self.schedule(Event::Message(None, message));
1007 }
1008
1009 pub fn current_account(&self) -> Option<Account> {
1010 self.current_connection.clone()
1011 }
1012
1013 pub fn add_crypto_engine(
1014 &mut self,
1015 account: &Account,
1016 recipient: &BareJid,
1017 crypto_engine: CryptoEngine,
1018 ) {
1019 let mut crypto_engines = self.crypto_engines.lock().unwrap();
1020 crypto_engines.insert((account.clone(), recipient.clone()), crypto_engine);
1021 }
1022 }