1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{collections::HashMap, convert::TryFrom};

use async_std::sync::Arc;
use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
use log::{debug, error};

use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};

use crate::{connection_loop::Context, manager::Event as BmEvent};

/// An object represents an user.
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct User {
    id: Vec<u8>,
    subscriptions: Vec<Address>,
    private_identities: Vec<PrivateIdentity>,
}

impl User {
    /// Constructs a new user object from an ID and a subscription list.
    pub fn new(
        id: Vec<u8>,
        subscriptions: Vec<Address>,
        private_identities: Vec<PrivateIdentity>,
    ) -> Self {
        Self {
            id,
            subscriptions,
            private_identities,
        }
    }
}

#[derive(Debug)]
pub enum Event {
    User(User),
    //Getpubkey(message::Object),
    //Pubkey(message::Object),
    Msg(message::Object),
    Broadcast(message::Object),
}

pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
    let mut bm_event_sender = ctx.bm_event_sender().clone();

    let mut users: HashMap<Vec<u8>, User> = HashMap::new();

    while let Some(event) = receiver.next().await {
        match event {
            Event::User(user) => {
                users.insert(user.id.clone(), user);
            }
            Event::Msg(object) => match object::Msg::try_from(object.clone()) {
                Ok(msg) => {
                    for user in users.values() {
                        for identity in &user.private_identities {
                            if let Err(_err) = msg.decrypt(object.header(), identity) {
                                continue;
                            }

                            if let Err(_err) = bm_event_sender
                                .send(BmEvent::Msg {
                                    user_id: user.id.clone(),
                                    address: identity.address().clone(),
                                    object: object.clone(),
                                })
                                .await
                            {
                                continue;
                            }
                        }
                    }
                }
                Err(err) => {
                    debug!("{}", err);
                }
            },
            Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
                Ok(broadcast) => {
                    for user in users.values() {
                        for subscription in &user.subscriptions {
                            match &broadcast {
                                object::Broadcast::V4(_) => {
                                    if let Err(_err) =
                                        broadcast.decrypt(object.header(), &subscription)
                                    {
                                        continue;
                                    }
                                }
                                object::Broadcast::V5(v5) => {
                                    if subscription.broadcast_tag() != *v5.tag() {
                                        continue;
                                    }
                                }
                            }

                            if let Err(err) = bm_event_sender
                                .send(BmEvent::Broadcast {
                                    user_id: user.id.clone(),
                                    address: subscription.clone(),
                                    object: object.clone(),
                                })
                                .await
                            {
                                error!("{}", err);
                                continue;
                            }
                        }
                    }
                }
                Err(err) => {
                    debug!("{}", err);
                }
            },
        }
    }
}