1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{collections::HashMap, convert::TryFrom};
use async_std::sync::Arc;
use futures::{channel::mpsc::Receiver, sink::SinkExt, stream::StreamExt};
use log::{debug, error};
use koibumi_core::{address::Address, message, object};
use crate::{connection_loop::Context, manager::Event as BmEvent};
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct User {
id: Vec<u8>,
subscriptions: Vec<Address>,
}
impl User {
pub fn new(id: Vec<u8>, subscriptions: Vec<Address>) -> Self {
Self { id, subscriptions }
}
}
#[derive(Debug)]
pub enum Event {
User(User),
Broadcast(message::Object),
}
pub async fn manage(ctx: Arc<Context>, mut receiver: Receiver<Event>) {
let mut bm_event_sender = ctx.bm_event_sender().clone();
let mut users: HashMap<Vec<u8>, User> = HashMap::new();
while let Some(event) = receiver.next().await {
match event {
Event::User(user) => {
users.insert(user.id.clone(), user);
}
Event::Broadcast(object) => match object::Broadcast::try_from(object.clone()) {
Ok(broadcast) => {
for user in users.values() {
for subscription in &user.subscriptions {
match &broadcast {
object::Broadcast::V4(_) => {
if let Err(_err) =
broadcast.decrypt(object.header(), &subscription)
{
continue;
}
}
object::Broadcast::V5(v5) => {
if subscription.tag() != *v5.tag() {
continue;
}
}
}
if let Err(err) = bm_event_sender
.send(BmEvent::Broadcast {
user_id: user.id.clone(),
address: subscription.clone(),
object: object.clone(),
})
.await
{
error!(target: "koibumi", "{}", err);
continue;
}
}
}
}
Err(err) => {
debug!(target: "koibumi", "{}", err);
}
},
}
}
}