1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use cerk::kernel::{BrokerEvent, CloudEventRoutingArgs, Config, IncomingCloudEvent};
use cerk::runtime::channel::{BoxedReceiver, BoxedSender};
use cerk::runtime::InternalServerId;
use cloudevents::Event;
use serde_json;
use std::io::{BufRead, BufReader};
use std::os::unix::net::{UnixListener, UnixStream};
use std::time::Duration;
fn liten_to_stream(
id: &InternalServerId,
listener: &UnixListener,
mut stream: Option<BufReader<UnixStream>>,
sender_to_kernel: &BoxedSender,
max_tries: usize,
) -> Option<BufReader<UnixStream>> {
if max_tries == 0 {
panic!("too many failures while trying to connect to stream");
}
debug!("listen to stream...");
match stream.as_mut() {
None => match listener.accept() {
Ok((socket, _)) => {
let stream = BufReader::new(socket);
liten_to_stream(id, listener, Some(stream), sender_to_kernel, max_tries - 1)
}
Err(err) => panic!(err),
},
Some(stream) => {
let mut line = String::new();
loop {
match stream.read_line(&mut line) {
Ok(0) => break,
Err(err) => {
error!("{} read_line error {:?}", id, err);
break;
}
Ok(_) => {
debug!("{} received new line", id);
match serde_json::from_str::<Event>(&line) {
Ok(cloud_event) => {
debug!("{} deserialized event successfully", id);
sender_to_kernel.send(BrokerEvent::IncomingCloudEvent(
IncomingCloudEvent {
routing_id: id.clone(),
incoming_id: id.clone(),
cloud_event,
args: CloudEventRoutingArgs::default(),
},
))
}
Err(err) => {
error!("{} while converting string to CloudEvent: {:?}", id, err);
}
}
}
}
line.clear();
}
None
}
}
}
pub fn port_input_unix_socket_json_start(
id: InternalServerId,
inbox: BoxedReceiver,
sender_to_kernel: BoxedSender,
) {
info!("start input JSON over unix socket port with id {}", id);
let mut listener: Option<UnixListener> = None;
let mut stream: Option<BufReader<UnixStream>> = None;
loop {
if let Some(broker_event) = inbox.receive_timeout(Duration::from_millis(100)) {
match broker_event {
BrokerEvent::Init => {
info!("{} initiated", id);
}
BrokerEvent::ConfigUpdated(config, _) => {
info!("{} received ConfigUpdated", id);
match config {
Config::String(socket_path) => {
listener = Some(UnixListener::bind(socket_path).unwrap());
}
_ => error!("{} received invalide config", id),
};
}
broker_event => warn!("event {} not implemented", broker_event),
}
}
if let Some(listener) = listener.as_ref() {
stream = liten_to_stream(&id, listener, stream, &sender_to_kernel, 10);
}
}
}