koibumi_node_sync/
user_manager.rs1use std::{collections::HashMap, convert::TryFrom, sync::Arc};
2
3use crossbeam_channel::{select, Receiver};
4use log::{debug, error};
5
6use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};
7
8use crate::{
9 connection_loop::{Context, ShutdownCommand},
10 manager::Event as BmEvent,
11};
12
13#[derive(Clone, PartialEq, Eq, Hash, Debug)]
15pub struct User {
16 id: Vec<u8>,
17 subscriptions: Vec<Address>,
18 private_identities: Vec<PrivateIdentity>,
19}
20
21impl User {
22 pub fn new(
24 id: Vec<u8>,
25 subscriptions: Vec<Address>,
26 private_identities: Vec<PrivateIdentity>,
27 ) -> Self {
28 Self {
29 id,
30 subscriptions,
31 private_identities,
32 }
33 }
34}
35
36#[derive(Debug)]
37pub enum Event {
38 User(User),
39 Msg(message::Object),
42 Broadcast(message::Object),
43 AddIdentity {
44 id: Vec<u8>,
45 identity: PrivateIdentity,
46 },
47 Subscribe {
48 id: Vec<u8>,
49 address: Address,
50 },
51}
52
53pub fn manage(
54 ctx: Arc<Context>,
55 receiver: Receiver<Event>,
56 shutdown_receiver: Receiver<ShutdownCommand>,
57) {
58 let bm_event_sender = ctx.bm_event_sender().clone();
59
60 let mut users: HashMap<Vec<u8>, User> = HashMap::new();
61
62 loop {
63 select! {
64 recv(receiver) -> event => match event {
65 Ok(event) => {
66 match event {
67 Event::User(user) => {
68 users.insert(user.id.clone(), user);
69 }
70 Event::Msg(object) => match object::Msg::try_from(object.clone()) {
71 Ok(msg) => {
72 for user in users.values() {
73 for identity in &user.private_identities {
74 if let Err(_err) = msg.decrypt(object.header(), identity) {
75 continue;
76 }
77 if let Err(_err) = bm_event_sender.send(BmEvent::Msg {
78 user_id: user.id.clone(),
79 address: identity.address().clone(),
80 object: object.clone(),
81 }) {
82 continue;
83 }
84 }
85 }
86 }
87 Err(err) => {
88 debug!("{}", err);
89 }
90 },
91 Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
92 Ok(broadcast) => {
93 for user in users.values() {
94 for subscription in &user.subscriptions {
95 match &broadcast {
96 object::Broadcast::V4(_) => {
97 if let Err(_err) =
98 broadcast.decrypt(object.header(), &subscription)
99 {
100 continue;
101 }
102 }
103 object::Broadcast::V5(v5) => {
104 if subscription.broadcast_tag() != *v5.tag() {
105 continue;
106 }
107 }
108 }
109 if let Err(err) = bm_event_sender.send(BmEvent::Broadcast {
110 user_id: user.id.clone(),
111 address: subscription.clone(),
112 object: object.clone(),
113 }) {
114 error!("{}", err);
115 continue;
116 }
117 }
118 }
119 }
120 Err(err) => {
121 debug!("{}", err);
122 }
123 },
124 Event::AddIdentity { id, identity } => {
125 if let Some(user) = users.get_mut(&id) {
126 if !user.private_identities.contains(&identity) {
127 user.private_identities.push(identity);
128 }
129 }
130 }
131 Event::Subscribe { id, address } => {
132 if let Some(user) = users.get_mut(&id) {
133 if !user.subscriptions.contains(&address) {
134 user.subscriptions.push(address);
135 }
136 }
137 }
138 }
139 },
140 Err(_err) => break,
141 },
142 recv(shutdown_receiver) -> _v => break,
143 }
144 }
145}