1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{collections::HashMap, convert::TryFrom};
use async_std::sync::Arc;
use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
use log::{debug, error};
use koibumi_core::{address::Address, identity::Private as PrivateIdentity, message, object};
use crate::{connection_loop::Context, manager::Event as BmEvent};
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct User {
id: Vec<u8>,
subscriptions: Vec<Address>,
private_identities: Vec<PrivateIdentity>,
}
impl User {
pub fn new(
id: Vec<u8>,
subscriptions: Vec<Address>,
private_identities: Vec<PrivateIdentity>,
) -> Self {
Self {
id,
subscriptions,
private_identities,
}
}
}
#[derive(Debug)]
pub enum Event {
User(User),
Msg(message::Object),
Broadcast(message::Object),
}
pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
let mut bm_event_sender = ctx.bm_event_sender().clone();
let mut users: HashMap<Vec<u8>, User> = HashMap::new();
while let Some(event) = receiver.next().await {
match event {
Event::User(user) => {
users.insert(user.id.clone(), user);
}
Event::Msg(object) => match object::Msg::try_from(object.clone()) {
Ok(msg) => {
for user in users.values() {
for identity in &user.private_identities {
if let Err(_err) = msg.decrypt(object.header(), identity) {
continue;
}
if let Err(_err) = bm_event_sender
.send(BmEvent::Msg {
user_id: user.id.clone(),
address: identity.address().clone(),
object: object.clone(),
})
.await
{
continue;
}
}
}
}
Err(err) => {
debug!("{}", err);
}
},
Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
Ok(broadcast) => {
for user in users.values() {
for subscription in &user.subscriptions {
match &broadcast {
object::Broadcast::V4(_) => {
if let Err(_err) =
broadcast.decrypt(object.header(), &subscription)
{
continue;
}
}
object::Broadcast::V5(v5) => {
if subscription.broadcast_tag() != *v5.tag() {
continue;
}
}
}
if let Err(err) = bm_event_sender
.send(BmEvent::Broadcast {
user_id: user.id.clone(),
address: subscription.clone(),
object: object.clone(),
})
.await
{
error!("{}", err);
continue;
}
}
}
}
Err(err) => {
debug!("{}", err);
}
},
}
}
}