1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{collections::HashMap, convert::TryFrom};

use async_std::sync::Arc;
use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
use log::{debug, error};

use koibumi_core::{address::Address, message, object};

use crate::{connection_loop::Context, manager::Event as BmEvent};

/// An object represents an user.
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct User {
    id: Vec<u8>,
    subscriptions: Vec<Address>,
}

impl User {
    /// Constructs a new user object from an ID and a subscription list.
    pub fn new(id: Vec<u8>, subscriptions: Vec<Address>) -> Self {
        Self { id, subscriptions }
    }
}

#[derive(Debug)]
pub enum Event {
    User(User),
    //Getpubkey(object::Getpubkey),
    //Pubkey(object::Pubkey),
    //Msg(object::Msg),
    Broadcast(message::Object),
}

pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
    let mut bm_event_sender = ctx.bm_event_sender().clone();

    let mut users: HashMap<Vec<u8>, User> = HashMap::new();

    while let Some(event) = receiver.next().await {
        match event {
            Event::User(user) => {
                users.insert(user.id.clone(), user);
            }
            Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
                Ok(broadcast) => {
                    for user in users.values() {
                        for subscription in &user.subscriptions {
                            match &broadcast {
                                object::Broadcast::V4(_) => {
                                    if let Err(_err) =
                                        broadcast.decrypt(object.header(), &subscription)
                                    {
                                        continue;
                                    }
                                }
                                object::Broadcast::V5(v5) => {
                                    if subscription.tag() != *v5.tag() {
                                        continue;
                                    }
                                }
                            }

                            if let Err(err) = bm_event_sender
                                .send(BmEvent::Broadcast {
                                    user_id: user.id.clone(),
                                    address: subscription.clone(),
                                    object: object.clone(),
                                })
                                .await
                            {
                                error!(target: "koibumi", "{}", err);
                                continue;
                            }
                        }
                    }
                }
                Err(err) => {
                    debug!(target: "koibumi", "{}", err);
                }
            },
        }
    }
}