kompact 0.11.3

Kompact is a Rust implementation of the Kompics component model combined with the Actor model.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Frames are the core of the message transport layer, allowing applications to build
//! custom protocols atop this library.

use bytes::{Buf, BufMut};

//use bytes::IntoBuf;
use std::{self, fmt::Debug, io};

use crate::{net::buffers::ChunkLease, prelude::SessionId};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};

//use stream::StreamId;
/// Used to identify start of a frame head
pub const MAGIC_NUM: u32 = 0xC0A1BA11;
// 192, 161, 186, 17
/// Framehead has constant size: (frame length) + (magic) + (frame type)
pub const FRAME_HEAD_LEN: u32 = 4 + 4 + 1;

/// Error messages for encoding/decoding
#[derive(Debug)]
pub enum FramingError {
    /// Encoding error, the buffer lacks capacity
    BufferCapacity,
    /// Uknown frame type.
    UnsupportedFrameType,
    /// Invalid start of frame.
    InvalidMagicNum((u32, Vec<u8>)),
    /// Invalid frame
    InvalidFrame,
    /// Serialisation during encode or deserialisation during decode
    SerialisationError,
    /// Unwrap error
    OptionError,
    /// No data to extract frame from
    NoData,
    /// IO errors wrapped into FramingError
    Io(std::io::Error),
}

impl From<std::io::Error> for FramingError {
    fn from(src: std::io::Error) -> Self {
        FramingError::Io(src)
    }
}

impl From<FramingError> for std::io::Error {
    fn from(_: FramingError) -> Self {
        io::Error::new(io::ErrorKind::InvalidData, "framing error")
    }
}

/// Core network frame definition
#[derive(Debug)]
pub enum Frame {
    /// Frame of Data
    Data(Data),
    /// Hello, used to initiate network channels
    Hello(Hello),
    /// Start, used to initiate network channels
    Start(Start),
    /// Ack to acknowledge that the connection is started.
    Ack(),
    /// Bye to signal that a channel is closing.
    Bye(),
}

impl Frame {
    /// Returns which FrameType a Frame is.
    pub fn frame_type(&self) -> FrameType {
        match *self {
            Frame::Data(_) => FrameType::Data,
            Frame::Hello(_) => FrameType::Hello,
            Frame::Start(_) => FrameType::Start,
            Frame::Ack() => FrameType::Ack,
            Frame::Bye() => FrameType::Bye,
        }
    }

    /// Encode a frame into a BufMut
    pub fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
        let mut head = FrameHead::new(self.frame_type(), self.encoded_len());
        head.encode_into(dst);
        match self {
            Frame::Data(frame) => frame.encode_into(dst),
            Frame::Hello(frame) => frame.encode_into(dst),
            Frame::Start(frame) => frame.encode_into(dst),
            Frame::Ack() => Ok(()),
            Frame::Bye() => Ok(()),
        }
    }

    /// Returns the number of bytes required to serialize this frame
    pub fn encoded_len(&self) -> usize {
        match *self {
            Frame::Data(ref frame) => frame.encoded_len(),
            Frame::Hello(ref frame) => frame.encoded_len(),
            Frame::Start(ref frame) => frame.encoded_len(),
            _ => 0,
        }
    }
}

/// Trait for unifying the Encode/Decode of the different FrameTypes.
pub(crate) trait FrameExt {
    fn decode_from(src: ChunkLease) -> Result<Frame, FramingError>;
    fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError>;
    fn encoded_len(&self) -> usize;
}

/// Request Credits for credit-based flow-control
#[derive(Debug)]
pub struct StreamRequest {
    /// How many credits are requested
    pub credit_capacity: u32,
}

/// Give Credits for credit-based flow-control
#[derive(Debug)]
pub struct CreditUpdate {
    /// How many credits are given
    pub credit: u32,
}

/// Frame of Data
#[derive(Debug)]
pub struct Data {
    /// The contents of the Frame
    pub payload: ChunkLease,
}

/// Hello, used to initiate network channels
#[derive(Debug, Clone)]
pub struct Hello {
    /// The Cannonical Address of the host saying Hello
    pub addr: SocketAddr,
}

/// Hello, used to initiate network channels
#[derive(Debug)]
pub struct Start {
    /// The Cannonical Address of the host sending the Start message
    pub addr: SocketAddr,
    /// "Channel ID", used as a tie-breaker in mutual connection requests
    pub id: SessionId,
}

/// Byte-mappings for frame types
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, Ord, PartialOrd, Eq)]
pub enum FrameType {
    /// Frame of Data
    Data = 0x02,
    /// Hello, used to initiate network channels
    Hello = 0x04,
    /// Start, used to initiate network channels
    Start = 0x05,
    /// Bye to signal that a channel is closing.
    Ack = 0x06,
    /// Bye to signal that a channel is closing.
    Bye = 0x07,
    /// Unknown frame type
    Unknown = 0x08,
}

impl From<u8> for FrameType {
    fn from(byte: u8) -> Self {
        match byte {
            0x02 => FrameType::Data,
            0x04 => FrameType::Hello,
            0x05 => FrameType::Start,
            0x06 => FrameType::Ack,
            0x07 => FrameType::Bye,
            _ => FrameType::Unknown,
        }
    }
}

/// Head of each frame
#[derive(Debug)]
pub(crate) struct FrameHead {
    frame_type: FrameType,
    content_length: usize,
}

impl FrameHead {
    pub(crate) fn new(frame_type: FrameType, content_length: usize) -> Self {
        FrameHead {
            frame_type,
            content_length,
        }
    }

    /// Encodes own fields and entire frame length into `dst`.
    /// This conforms to the length_delimited decoder found in the framed writer
    pub(crate) fn encode_into<B: BufMut>(&mut self, dst: &mut B) {
        assert!(dst.remaining_mut() >= FRAME_HEAD_LEN as usize);
        // Represents total length, including bytes for encoding length
        // NOTE: This is not needed, and thus commented out, if length_delimited is also used for writing (as in the kompcis code)
        dst.put_u32(MAGIC_NUM);
        dst.put_u32(self.content_length as u32);
        dst.put_u8(self.frame_type as u8);
    }

    pub(crate) fn decode_from<B: Buf + ?Sized>(src: &mut B) -> Result<Self, FramingError> {
        // length_delimited's decoder will have parsed the length out of `src`, subtract that out
        if src.remaining() < (FRAME_HEAD_LEN) as usize {
            return Err(FramingError::NoData);
        }

        let magic_check = src.get_u32();
        if magic_check != MAGIC_NUM {
            eprintln!("Magic check fail: {:X}", magic_check);
            return Err(FramingError::InvalidMagicNum((
                magic_check,
                src.chunk().to_vec(),
            )));
        }

        let content_length = src.get_u32() as usize;

        let frame_type: FrameType = src.get_u8().into();
        let head = FrameHead::new(frame_type, content_length);
        Ok(head)
    }

    pub(crate) fn content_length(&self) -> usize {
        self.content_length
    }

    pub(crate) fn frame_type(&self) -> FrameType {
        self.frame_type
    }
}

impl Hello {
    /// Create a new hello message
    pub fn new(addr: SocketAddr) -> Self {
        Hello { addr }
    }

    /// Get the address sent in the Hello message
    pub fn addr(&self) -> SocketAddr {
        self.addr
    }
}

impl Start {
    /// Create a new hello message
    pub fn new(addr: SocketAddr, id: SessionId) -> Self {
        Start { addr, id }
    }

    /// Get the address sent in the Start message
    pub fn addr(&self) -> SocketAddr {
        self.addr
    }

    /// Get the address sent in the Start message
    pub fn id(&self) -> SessionId {
        self.id
    }
}

impl Data {
    /// Create a new data frame
    pub fn new(payload: ChunkLease) -> Self {
        Data { payload }
    }

    pub(crate) fn encoded_len(&self) -> usize {
        self.payload.capacity()
    }

    /// Consumes this frame and returns the raw payload buffer
    pub(crate) fn payload(self) -> ChunkLease {
        self.payload
    }
}

impl FrameExt for Data {
    fn decode_from(payload: ChunkLease) -> Result<Frame, FramingError> {
        let data_frame = Data { payload };
        Ok(Frame::Data(data_frame))
    }

    /// This method should only be used for non-data frames, it copies and consumes the data!
    fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
        // NOTE: This method _COPIES_ the owned bytes into `dst` rather than extending with the owned bytes
        assert!(dst.remaining_mut() >= (self.encoded_len()));
        while self.payload.has_remaining() {
            dst.put_slice(self.payload.chunk());
        }
        Ok(())
    }

    fn encoded_len(&self) -> usize {
        self.payload.chunk().len()
    }
}

impl FrameExt for Hello {
    fn decode_from(mut src: ChunkLease) -> Result<Frame, FramingError> {
        match src.get_u8() {
            4 => {
                let ip = Ipv4Addr::from(src.get_u32());
                let port = src.get_u16();
                let addr = SocketAddr::new(IpAddr::V4(ip), port);
                Ok(Frame::Hello(Hello::new(addr)))
            }
            6 => {
                let ip = Ipv6Addr::from(src.get_u128());
                let port = src.get_u16();
                let addr = SocketAddr::new(IpAddr::V6(ip), port);
                Ok(Frame::Hello(Hello::new(addr)))
            }
            _ => {
                panic!("Faulty Hello Message!");
            }
        }
    }

    fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
        match self.addr {
            SocketAddr::V4(v4) => {
                dst.put_u8(4); // version
                dst.put_slice(&v4.ip().octets()); // ip
                dst.put_u16(v4.port()); // port
                Ok(())
            }
            SocketAddr::V6(v6) => {
                dst.put_u8(6); // version
                dst.put_slice(&v6.ip().octets()); // ip
                dst.put_u16(v6.port()); // port
                Ok(())
            }
        }
    }

    fn encoded_len(&self) -> usize {
        match self.addr {
            SocketAddr::V4(_v4) => {
                1 + 4 + 2 // version + ip + port
            }
            SocketAddr::V6(_v6) => {
                1 + 16 + 2 // version + ip + port
            }
        }
    }
}

impl FrameExt for Start {
    fn decode_from(mut src: ChunkLease) -> Result<Frame, FramingError> {
        match src.get_u8() {
            4 => {
                let ip = Ipv4Addr::from(src.get_u32());
                let port = src.get_u16();
                let addr = SocketAddr::new(IpAddr::V4(ip), port);
                let id = SessionId::from_u128(src.get_u128());
                Ok(Frame::Start(Start::new(addr, id)))
            }
            6 => {
                let ip = Ipv6Addr::from(src.get_u128());
                let port = src.get_u16();
                let addr = SocketAddr::new(IpAddr::V6(ip), port);
                let id = SessionId::from_u128(src.get_u128());
                Ok(Frame::Start(Start::new(addr, id)))
            }
            _ => {
                panic!("Faulty Hello Message!");
            }
        }
    }

    fn encode_into<B: BufMut>(&mut self, dst: &mut B) -> Result<(), FramingError> {
        match self.addr {
            SocketAddr::V4(v4) => {
                dst.put_u8(4); // version
                dst.put_slice(&v4.ip().octets()); // ip
                dst.put_u16(v4.port()); // port
                dst.put_u128(self.id.as_u128()); //id
                Ok(())
            }
            SocketAddr::V6(v6) => {
                dst.put_u8(6); // version
                dst.put_slice(&v6.ip().octets()); // ip
                dst.put_u16(v6.port()); // port
                dst.put_u128(self.id.as_u128()); //id
                Ok(())
            }
        }
    }

    fn encoded_len(&self) -> usize {
        match self.addr {
            SocketAddr::V4(_v4) => {
                1 + 4 + 2 + 16 // version + ip + port + uuid
            }
            SocketAddr::V6(_v6) => {
                1 + 16 + 2 + 16 // version + ip + port + uuid
            }
        }
    }
}