koibumi_node_sync/
user_manager.rs

1use std::{collections::HashMap, convert::TryFrom, sync::Arc};
2
3use crossbeam_channel::{select, Receiver};
4use log::{debug, error};
5
6use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};
7
8use crate::{
9    connection_loop::{Context, ShutdownCommand},
10    manager::Event as BmEvent,
11};
12
13/// An object represents an user.
14#[derive(Clone, PartialEq, Eq, Hash, Debug)]
15pub struct User {
16    id: Vec<u8>,
17    subscriptions: Vec<Address>,
18    private_identities: Vec<PrivateIdentity>,
19}
20
21impl User {
22    /// Constructs a new user object from an ID and a subscription list.
23    pub fn new(
24        id: Vec<u8>,
25        subscriptions: Vec<Address>,
26        private_identities: Vec<PrivateIdentity>,
27    ) -> Self {
28        Self {
29            id,
30            subscriptions,
31            private_identities,
32        }
33    }
34}
35
36#[derive(Debug)]
37pub enum Event {
38    User(User),
39    //Getpubkey(message::Object),
40    //Pubkey(message::Object),
41    Msg(message::Object),
42    Broadcast(message::Object),
43    AddIdentity {
44        id: Vec<u8>,
45        identity: PrivateIdentity,
46    },
47    Subscribe {
48        id: Vec<u8>,
49        address: Address,
50    },
51}
52
53pub fn manage(
54    ctx: Arc<Context>,
55    receiver: Receiver<Event>,
56    shutdown_receiver: Receiver<ShutdownCommand>,
57) {
58    let bm_event_sender = ctx.bm_event_sender().clone();
59
60    let mut users: HashMap<Vec<u8>, User> = HashMap::new();
61
62    loop {
63        select! {
64            recv(receiver) -> event => match event {
65                Ok(event) => {
66                    match event {
67                        Event::User(user) => {
68                            users.insert(user.id.clone(), user);
69                        }
70                        Event::Msg(object) => match object::Msg::try_from(object.clone()) {
71                            Ok(msg) => {
72                                for user in users.values() {
73                                    for identity in &user.private_identities {
74                                        if let Err(_err) = msg.decrypt(object.header(), identity) {
75                                            continue;
76                                        }
77                                        if let Err(_err) = bm_event_sender.send(BmEvent::Msg {
78                                            user_id: user.id.clone(),
79                                            address: identity.address().clone(),
80                                            object: object.clone(),
81                                        }) {
82                                            continue;
83                                        }
84                                    }
85                                }
86                            }
87                            Err(err) => {
88                                debug!("{}", err);
89                            }
90                        },
91                        Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
92                            Ok(broadcast) => {
93                                for user in users.values() {
94                                    for subscription in &user.subscriptions {
95                                        match &broadcast {
96                                            object::Broadcast::V4(_) => {
97                                                if let Err(_err) =
98                                                    broadcast.decrypt(object.header(), &subscription)
99                                                {
100                                                    continue;
101                                                }
102                                            }
103                                            object::Broadcast::V5(v5) => {
104                                                if subscription.broadcast_tag() != *v5.tag() {
105                                                    continue;
106                                                }
107                                            }
108                                        }
109                                        if let Err(err) = bm_event_sender.send(BmEvent::Broadcast {
110                                            user_id: user.id.clone(),
111                                            address: subscription.clone(),
112                                            object: object.clone(),
113                                        }) {
114                                            error!("{}", err);
115                                            continue;
116                                        }
117                                    }
118                                }
119                            }
120                            Err(err) => {
121                                debug!("{}", err);
122                            }
123                        },
124                        Event::AddIdentity { id, identity } => {
125                            if let Some(user) = users.get_mut(&id) {
126                                if !user.private_identities.contains(&identity) {
127                                    user.private_identities.push(identity);
128                                }
129                            }
130                        }
131                        Event::Subscribe { id, address } => {
132                            if let Some(user) = users.get_mut(&id) {
133                                if !user.subscriptions.contains(&address) {
134                                    user.subscriptions.push(address);
135                                }
136                            }
137                        }
138                    }
139                },
140                Err(_err) => break,
141            },
142            recv(shutdown_receiver) -> _v => break,
143        }
144    }
145}