1use {
2 crate::{
3 bimap::BidirectionalMap,
4 bytes::{as_u32_le, u32_as_u8_le},
5 error::{Error, Result},
6 protos::comms::Message,
7 },
8 log::debug,
9 protobuf::{parse_from_bytes, Message as _},
10 std::{
11 cmp,
12 collections::{HashMap, VecDeque},
13 io::prelude::*,
14 os::unix::net::UnixStream,
15 sync::mpsc::Receiver,
16 time,
17 },
18};
19
20pub struct EmblyCtx {
21 pub address_map: BidirectionalMap<i32, u64>,
22 pub address_count: i32,
23 pub address: u64,
24 parent_address: u64,
25 pending_events: Vec<i32>,
26 receiver: Receiver<Message>,
27 read_buffers: HashMap<i32, VecDeque<Message>>,
28 stream_writer: UnixStream,
29}
30
31impl EmblyCtx {
32 pub fn new(
33 receiver: Receiver<Message>,
34 stream_writer: UnixStream,
35 address: u64,
36 parent_address: u64,
37 ) -> Self {
38 let address_map = BidirectionalMap::new();
39 let mut ctx = Self {
40 receiver,
41 stream_writer,
42 address_map,
43 address,
44 parent_address,
45 address_count: 0,
46 read_buffers: HashMap::new(),
47 pending_events: Vec::new(),
48 };
49 ctx.add_address(parent_address);
50 ctx
51 }
52
53 pub fn write(&mut self, id: i32, buf: &[u8]) -> Result<usize> {
54 let mut msg = Message::new();
55 msg.set_to(
56 *self
57 .address_map
58 .get_value(id)
59 .ok_or(Error::DescriptorDoesntExist)?,
60 );
61 msg.set_from(self.address);
62 msg.set_data(buf.to_vec());
63 self.write_msg(msg)?;
64 Ok(buf.len())
65 }
66
67 pub fn read(&mut self, id: i32, buf: &mut [u8]) -> Result<usize> {
68 self.process_messages(Some(time::Duration::new(0, 0)))?;
69
70 if let Some(queue) = self.read_buffers.get_mut(&id) {
71 if queue.is_empty() {
72 return Ok(0);
73 }
74 let msg = queue.get_mut(0).expect("there should be something here");
75 if msg.error != 0 {
76 return Err(Error::Io(std::io::Error::from(
78 std::io::ErrorKind::AddrNotAvailable,
79 )));
80 }
81 let msg_data_ln = msg.get_data().len();
82 let to_drain = cmp::min(buf.len(), msg_data_ln);
83 let part: Vec<u8> = msg.mut_data().drain(..to_drain).collect();
84 buf[..to_drain].copy_from_slice(&part);
85 if msg.get_data().is_empty() {
86 queue.pop_front();
87 }
88 Ok(part.len())
89 } else {
90 println!("no buffers for id");
91 Ok(0)
92 }
93 }
94
95 fn save_msg(&mut self, msg: Message) -> Result<i32> {
96 if msg.from == 0 {
97 debug!("message has invalid from of 0 {:?}", msg)
98 }
100 if msg.to == 0 {
101 debug!("message has invalid to of 0 {:?}", msg)
102 }
104
105 let addr = self.add_address(msg.from);
106 debug!("save_msg_addr {:?}", (addr, msg.from));
107 if self.read_buffers.get(&addr).is_none() {
108 self.read_buffers.insert(addr, VecDeque::new());
109 }
110 let buf = self.read_buffers.get_mut(&addr).unwrap();
111 buf.push_back(msg);
112 Ok(addr)
113 }
114
115 fn process_messages(&mut self, timeout: Option<time::Duration>) -> Result<()> {
116 let mut new: Vec<Message> = self.receiver.try_iter().collect();
117
118 if new.is_empty() {
120 if let Some(dur) = timeout {
121 if let Ok(msg) = self.receiver.recv_timeout(dur) {
122 new.push(msg)
123 }
124 } else {
125 let msg = self.receiver.recv()?;
127 new.push(msg);
128 }
129 }
130 for msg in new.drain(..) {
131 let i = self.save_msg(msg)?;
132 self.pending_events.push(i);
133 }
134 Ok(())
135 }
136
137 pub fn events_limited(
138 &mut self,
139 timeout: Option<time::Duration>,
140 limit: usize,
141 ) -> Result<Vec<i32>> {
142 self.process_messages(timeout)?;
143 let to_drain = cmp::min(self.pending_events.len(), limit);
144 Ok(self.pending_events.drain(..to_drain).collect())
145 }
146
147 #[allow(dead_code)]
148 pub fn events(&mut self, timeout: Option<time::Duration>) -> Result<Vec<i32>> {
149 self.process_messages(timeout)?;
150 Ok(self.pending_events.drain(..).collect())
151 }
152
153 fn add_address(&mut self, addr: u64) -> i32 {
154 if let Some(k) = self.address_map.get_key(addr) {
155 return *k;
156 }
157 self.address_count += 1;
158 self.address_map.insert(self.address_count, addr);
159 self.address_count
160 }
161
162 pub fn spawn(&mut self, name: &str) -> Result<i32> {
163 let spawn_addr = rand::random::<u64>();
164 let addr = self.add_address(spawn_addr);
165
166 let mut msg = Message::new();
167 msg.set_spawn(name.to_string());
168 msg.set_to(self.parent_address);
169 msg.set_from(self.address);
170
171 msg.set_spawn_address(spawn_addr);
172
173 self.write_msg(msg)?;
179 Ok(addr)
180 }
181 fn write_msg(&mut self, msg: Message) -> Result<()> {
182 write_msg(&mut self.stream_writer, msg)
183 }
184}
185
186pub fn write_msg(stream: &mut UnixStream, msg: Message) -> Result<()> {
187 let msg_bytes = msg.write_to_bytes()?;
188 stream.write_all(&u32_as_u8_le(msg_bytes.len() as u32))?;
189 stream.write_all(&msg_bytes)?;
190 Ok(())
191}
192
193pub fn next_message(stream: &mut UnixStream) -> Result<Message> {
194 let mut size_bytes: [u8; 4] = [0; 4];
195 stream.read_exact(&mut size_bytes)?;
196 let size = as_u32_le(&size_bytes) as usize;
197 let mut read = 0;
198 if size == 0 {
199 return Ok(Message::new());
200 }
201 let mut msg_bytes = vec![0; size];
202 loop {
203 let ln = stream.read(&mut msg_bytes[read..])?;
204 read += ln;
205 debug!(
206 "reading msg {:?}",
207 (ln, msg_bytes[read..].len(), read, size)
208 );
209 if ln == 0 || read == size {
210 break;
211 }
212 }
213 let msg: Message = parse_from_bytes(&msg_bytes)?;
214 Ok(msg)
215}