koibumi_node/
user_manager.rs1use std::{collections::HashMap, convert::TryFrom};
2
3use async_std::sync::Arc;
4use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
5use log::{debug, error};
6
7use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};
8
9use crate::{connection_loop::Context, manager::Event as BmEvent};
10
11#[derive(Clone, PartialEq, Eq, Hash, Debug)]
13pub struct User {
14 id: Vec<u8>,
15 subscriptions: Vec<Address>,
16 private_identities: Vec<PrivateIdentity>,
17}
18
19impl User {
20 pub fn new(
22 id: Vec<u8>,
23 subscriptions: Vec<Address>,
24 private_identities: Vec<PrivateIdentity>,
25 ) -> Self {
26 Self {
27 id,
28 subscriptions,
29 private_identities,
30 }
31 }
32}
33
34#[derive(Debug)]
35pub enum Event {
36 User(User),
37 Msg(message::Object),
40 Broadcast(message::Object),
41 AddIdentity {
42 id: Vec<u8>,
43 identity: PrivateIdentity,
44 },
45 Subscribe {
46 id: Vec<u8>,
47 address: Address,
48 },
49}
50
51pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
52 let mut bm_event_sender = ctx.bm_event_sender().clone();
53
54 let mut users: HashMap<Vec<u8>, User> = HashMap::new();
55
56 while let Some(event) = receiver.next().await {
57 match event {
58 Event::User(user) => {
59 users.insert(user.id.clone(), user);
60 }
61 Event::Msg(object) => match object::Msg::try_from(object.clone()) {
62 Ok(msg) => {
63 for user in users.values() {
64 for identity in &user.private_identities {
65 if let Err(_err) = msg.decrypt(object.header(), identity) {
66 continue;
67 }
68
69 if let Err(_err) = bm_event_sender
70 .send(BmEvent::Msg {
71 user_id: user.id.clone(),
72 address: identity.address().clone(),
73 object: object.clone(),
74 })
75 .await
76 {
77 continue;
78 }
79 }
80 }
81 }
82 Err(err) => {
83 debug!("{}", err);
84 }
85 },
86 Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
87 Ok(broadcast) => {
88 for user in users.values() {
89 for subscription in &user.subscriptions {
90 match &broadcast {
91 object::Broadcast::V4(_) => {
92 if let Err(_err) =
93 broadcast.decrypt(object.header(), &subscription)
94 {
95 continue;
96 }
97 }
98 object::Broadcast::V5(v5) => {
99 if subscription.broadcast_tag() != *v5.tag() {
100 continue;
101 }
102 }
103 }
104
105 if let Err(err) = bm_event_sender
106 .send(BmEvent::Broadcast {
107 user_id: user.id.clone(),
108 address: subscription.clone(),
109 object: object.clone(),
110 })
111 .await
112 {
113 error!("{}", err);
114 continue;
115 }
116 }
117 }
118 }
119 Err(err) => {
120 debug!("{}", err);
121 }
122 },
123 Event::AddIdentity { id, identity } => {
124 if let Some(user) = users.get_mut(&id) {
125 if !user.private_identities.contains(&identity) {
126 user.private_identities.push(identity);
127 }
128 }
129 }
130 Event::Subscribe { id, address } => {
131 if let Some(user) = users.get_mut(&id) {
132 if !user.subscriptions.contains(&address) {
133 user.subscriptions.push(address);
134 }
135 }
136 }
137 }
138 }
139}