embly_wrapper/
context.rs

1use {
2    crate::{
3        bimap::BidirectionalMap,
4        bytes::{as_u32_le, u32_as_u8_le},
5        error::{Error, Result},
6        protos::comms::Message,
7    },
8    log::debug,
9    protobuf::{parse_from_bytes, Message as _},
10    std::{
11        cmp,
12        collections::{HashMap, VecDeque},
13        io::prelude::*,
14        os::unix::net::UnixStream,
15        sync::mpsc::Receiver,
16        time,
17    },
18};
19
20pub struct EmblyCtx {
21    pub address_map: BidirectionalMap<i32, u64>,
22    pub address_count: i32,
23    pub address: u64,
24    parent_address: u64,
25    pending_events: Vec<i32>,
26    receiver: Receiver<Message>,
27    read_buffers: HashMap<i32, VecDeque<Message>>,
28    stream_writer: UnixStream,
29}
30
31impl EmblyCtx {
32    pub fn new(
33        receiver: Receiver<Message>,
34        stream_writer: UnixStream,
35        address: u64,
36        parent_address: u64,
37    ) -> Self {
38        let address_map = BidirectionalMap::new();
39        let mut ctx = Self {
40            receiver,
41            stream_writer,
42            address_map,
43            address,
44            parent_address,
45            address_count: 0,
46            read_buffers: HashMap::new(),
47            pending_events: Vec::new(),
48        };
49        ctx.add_address(parent_address);
50        ctx
51    }
52
53    pub fn write(&mut self, id: i32, buf: &[u8]) -> Result<usize> {
54        let mut msg = Message::new();
55        msg.set_to(
56            *self
57                .address_map
58                .get_value(id)
59                .ok_or(Error::DescriptorDoesntExist)?,
60        );
61        msg.set_from(self.address);
62        msg.set_data(buf.to_vec());
63        self.write_msg(msg)?;
64        Ok(buf.len())
65    }
66
67    pub fn read(&mut self, id: i32, buf: &mut [u8]) -> Result<usize> {
68        self.process_messages(Some(time::Duration::new(0, 0)))?;
69
70        if let Some(queue) = self.read_buffers.get_mut(&id) {
71            if queue.is_empty() {
72                return Ok(0);
73            }
74            let msg = queue.get_mut(0).expect("there should be something here");
75            if msg.error != 0 {
76                // TODO: actually create the correct io error
77                return Err(Error::Io(std::io::Error::from(
78                    std::io::ErrorKind::AddrNotAvailable,
79                )));
80            }
81            let msg_data_ln = msg.get_data().len();
82            let to_drain = cmp::min(buf.len(), msg_data_ln);
83            let part: Vec<u8> = msg.mut_data().drain(..to_drain).collect();
84            buf[..to_drain].copy_from_slice(&part);
85            if msg.get_data().is_empty() {
86                queue.pop_front();
87            }
88            Ok(part.len())
89        } else {
90            println!("no buffers for id");
91            Ok(0)
92        }
93    }
94
95    fn save_msg(&mut self, msg: Message) -> Result<i32> {
96        if msg.from == 0 {
97            debug!("message has invalid from of 0 {:?}", msg)
98            // TODO: err
99        }
100        if msg.to == 0 {
101            debug!("message has invalid to of 0 {:?}", msg)
102            // TODO: err
103        }
104
105        let addr = self.add_address(msg.from);
106        debug!("save_msg_addr {:?}", (addr, msg.from));
107        if self.read_buffers.get(&addr).is_none() {
108            self.read_buffers.insert(addr, VecDeque::new());
109        }
110        let buf = self.read_buffers.get_mut(&addr).unwrap();
111        buf.push_back(msg);
112        Ok(addr)
113    }
114
115    fn process_messages(&mut self, timeout: Option<time::Duration>) -> Result<()> {
116        let mut new: Vec<Message> = self.receiver.try_iter().collect();
117
118        // if we have events we return
119        if new.is_empty() {
120            if let Some(dur) = timeout {
121                if let Ok(msg) = self.receiver.recv_timeout(dur) {
122                    new.push(msg)
123                }
124            } else {
125                // if no timeout is given we block forever
126                let msg = self.receiver.recv()?;
127                new.push(msg);
128            }
129        }
130        for msg in new.drain(..) {
131            let i = self.save_msg(msg)?;
132            self.pending_events.push(i);
133        }
134        Ok(())
135    }
136
137    pub fn events_limited(
138        &mut self,
139        timeout: Option<time::Duration>,
140        limit: usize,
141    ) -> Result<Vec<i32>> {
142        self.process_messages(timeout)?;
143        let to_drain = cmp::min(self.pending_events.len(), limit);
144        Ok(self.pending_events.drain(..to_drain).collect())
145    }
146
147    #[allow(dead_code)]
148    pub fn events(&mut self, timeout: Option<time::Duration>) -> Result<Vec<i32>> {
149        self.process_messages(timeout)?;
150        Ok(self.pending_events.drain(..).collect())
151    }
152
153    fn add_address(&mut self, addr: u64) -> i32 {
154        if let Some(k) = self.address_map.get_key(addr) {
155            return *k;
156        }
157        self.address_count += 1;
158        self.address_map.insert(self.address_count, addr);
159        self.address_count
160    }
161
162    pub fn spawn(&mut self, name: &str) -> Result<i32> {
163        let spawn_addr = rand::random::<u64>();
164        let addr = self.add_address(spawn_addr);
165
166        let mut msg = Message::new();
167        msg.set_spawn(name.to_string());
168        msg.set_to(self.parent_address);
169        msg.set_from(self.address);
170
171        msg.set_spawn_address(spawn_addr);
172
173        // TODO! for now we generate the address ourselves here. This is just the easiest
174        // because the function immediately knows where to send bytes to and the master
175        // will receive events in order and be able to sort it out. Alternatively this
176        // function would need be issued addresses to allocate or wait for a response
177
178        self.write_msg(msg)?;
179        Ok(addr)
180    }
181    fn write_msg(&mut self, msg: Message) -> Result<()> {
182        write_msg(&mut self.stream_writer, msg)
183    }
184}
185
186pub fn write_msg(stream: &mut UnixStream, msg: Message) -> Result<()> {
187    let msg_bytes = msg.write_to_bytes()?;
188    stream.write_all(&u32_as_u8_le(msg_bytes.len() as u32))?;
189    stream.write_all(&msg_bytes)?;
190    Ok(())
191}
192
193pub fn next_message(stream: &mut UnixStream) -> Result<Message> {
194    let mut size_bytes: [u8; 4] = [0; 4];
195    stream.read_exact(&mut size_bytes)?;
196    let size = as_u32_le(&size_bytes) as usize;
197    let mut read = 0;
198    if size == 0 {
199        return Ok(Message::new());
200    }
201    let mut msg_bytes = vec![0; size];
202    loop {
203        let ln = stream.read(&mut msg_bytes[read..])?;
204        read += ln;
205        debug!(
206            "reading msg {:?}",
207            (ln, msg_bytes[read..].len(), read, size)
208        );
209        if ln == 0 || read == size {
210            break;
211        }
212    }
213    let msg: Message = parse_from_bytes(&msg_bytes)?;
214    Ok(msg)
215}