koibumi_node/
user_manager.rs

1use std::{collections::HashMap, convert::TryFrom};
2
3use async_std::sync::Arc;
4use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
5use log::{debug, error};
6
7use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};
8
9use crate::{connection_loop::Context, manager::Event as BmEvent};
10
11/// An object represents an user.
12#[derive(Clone, PartialEq, Eq, Hash, Debug)]
13pub struct User {
14    id: Vec<u8>,
15    subscriptions: Vec<Address>,
16    private_identities: Vec<PrivateIdentity>,
17}
18
19impl User {
20    /// Constructs a new user object from an ID and a subscription list.
21    pub fn new(
22        id: Vec<u8>,
23        subscriptions: Vec<Address>,
24        private_identities: Vec<PrivateIdentity>,
25    ) -> Self {
26        Self {
27            id,
28            subscriptions,
29            private_identities,
30        }
31    }
32}
33
34#[derive(Debug)]
35pub enum Event {
36    User(User),
37    //Getpubkey(message::Object),
38    //Pubkey(message::Object),
39    Msg(message::Object),
40    Broadcast(message::Object),
41    AddIdentity {
42        id: Vec<u8>,
43        identity: PrivateIdentity,
44    },
45    Subscribe {
46        id: Vec<u8>,
47        address: Address,
48    },
49}
50
51pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
52    let mut bm_event_sender = ctx.bm_event_sender().clone();
53
54    let mut users: HashMap<Vec<u8>, User> = HashMap::new();
55
56    while let Some(event) = receiver.next().await {
57        match event {
58            Event::User(user) => {
59                users.insert(user.id.clone(), user);
60            }
61            Event::Msg(object) => match object::Msg::try_from(object.clone()) {
62                Ok(msg) => {
63                    for user in users.values() {
64                        for identity in &user.private_identities {
65                            if let Err(_err) = msg.decrypt(object.header(), identity) {
66                                continue;
67                            }
68
69                            if let Err(_err) = bm_event_sender
70                                .send(BmEvent::Msg {
71                                    user_id: user.id.clone(),
72                                    address: identity.address().clone(),
73                                    object: object.clone(),
74                                })
75                                .await
76                            {
77                                continue;
78                            }
79                        }
80                    }
81                }
82                Err(err) => {
83                    debug!("{}", err);
84                }
85            },
86            Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
87                Ok(broadcast) => {
88                    for user in users.values() {
89                        for subscription in &user.subscriptions {
90                            match &broadcast {
91                                object::Broadcast::V4(_) => {
92                                    if let Err(_err) =
93                                        broadcast.decrypt(object.header(), &subscription)
94                                    {
95                                        continue;
96                                    }
97                                }
98                                object::Broadcast::V5(v5) => {
99                                    if subscription.broadcast_tag() != *v5.tag() {
100                                        continue;
101                                    }
102                                }
103                            }
104
105                            if let Err(err) = bm_event_sender
106                                .send(BmEvent::Broadcast {
107                                    user_id: user.id.clone(),
108                                    address: subscription.clone(),
109                                    object: object.clone(),
110                                })
111                                .await
112                            {
113                                error!("{}", err);
114                                continue;
115                            }
116                        }
117                    }
118                }
119                Err(err) => {
120                    debug!("{}", err);
121                }
122            },
123            Event::AddIdentity { id, identity } => {
124                if let Some(user) = users.get_mut(&id) {
125                    if !user.private_identities.contains(&identity) {
126                        user.private_identities.push(identity);
127                    }
128                }
129            }
130            Event::Subscribe { id, address } => {
131                if let Some(user) = users.get_mut(&id) {
132                    if !user.subscriptions.contains(&address) {
133                        user.subscriptions.push(address);
134                    }
135                }
136            }
137        }
138    }
139}