wgtk/net/
bundle.rs

1//! Structures for managing bundles of packets.
2
3use std::io::{self, Write, Cursor, Read};
4use std::fmt;
5
6use thiserror::Error;
7
8use super::packet::{Packet, PACKET_FLAGS_LEN, PACKET_MAX_BODY_LEN};
9use super::element::reply::{Reply, ReplyHeader, REPLY_ID};
10use super::element::{Element, TopElement};
11
12use crate::util::io::*;
13use crate::util::BytesFmt;
14
15
16/// A bundle is a sequence of packets that are used to store elements. 
17/// Elements of various types, like regular elements, requests or 
18/// replies can be simply added and the number of packets contained in
19/// this bundle is automatically adjusted if no more space is available.
20/// 
21/// Functions that are used to add elements provide a builder-like 
22/// structure by returning a mutable reference to itself.
23#[derive(Debug)]
24pub struct Bundle {
25    /// Chain of packets.
26    packets: Vec<Box<Packet>>,
27    /// Indicate if a new packet must be added before a new message. 
28    /// It's used to avoid mixing manually-added packets with packets 
29    /// from newly inserted elements. It's mandatory because we don't 
30    /// know what `request_last_link_offset` should be from manually
31    /// added packets.
32    force_new_packet: bool,
33    /// Available length on the last packet, used to avoid borrowing issues.
34    available_len: usize,
35    /// Offset of the link of the last request, `0` if not request yet.
36    last_request_header_offset: usize,
37}
38
39impl Bundle {
40
41    /// Construct a new empty bundle, this bundle doesn't
42    /// allocate until you add the first element.
43    pub fn new() -> Bundle {
44        Self::with_multiple(vec![])
45    }
46
47    /// Create a new bundle with one predefined packet.
48    pub fn with_single(packet: Box<Packet>) -> Self {
49        Self::with_multiple(vec![packet])
50    }
51
52    /// Create a new bundle with multiple predefined packets.
53    pub fn with_multiple(packets: Vec<Box<Packet>>) -> Self {
54        Self {
55            available_len: packets.last().map(|p| p.content_available_len()).unwrap_or(0),
56            packets,
57            force_new_packet: true,
58            last_request_header_offset: 0,
59        }
60    }
61
62    /// See [`BundleElementReader`].
63    pub fn element_reader(&self) -> BundleElementReader<'_> {
64        BundleElementReader::new(self)
65    }
66
67    /// See [`BundleElementWriter`].
68    pub fn element_writer(&mut self) -> BundleElementWriter<'_> {
69        BundleElementWriter::new(self)
70    }
71
72    /// Return the number of packets in this bundle.
73    #[inline]
74    pub fn len(&self) -> usize {
75        self.packets.len()
76    }
77
78    #[inline]
79    pub fn is_empty(&self) -> bool {
80        self.packets.is_empty()
81    }
82
83    /// Clear the bundle by removing all packets.
84    pub fn clear(&mut self) {
85        self.packets.clear();
86        self.force_new_packet = true;
87        self.available_len = 0;
88        self.last_request_header_offset = 0;
89    }
90
91    /// Get a slice of all packets of this bundle.
92    #[inline]
93    pub fn packets(&self) -> &[Box<Packet>] {
94        &self.packets[..]
95    }
96
97    #[inline]
98    pub fn packets_mut(&mut self) -> &mut [Box<Packet>] {
99        &mut self.packets[..]
100    }
101
102    /// Internal method to add a new packet at the end of the chain.
103    fn add_packet(&mut self) {
104        let packet = Packet::new_boxed();
105        self.available_len = packet.content_available_len();
106        self.packets.push(packet);
107        self.last_request_header_offset = 0;
108        self.force_new_packet = false;
109    }
110
111    /// Internal method to add a a new packet only if forced.
112    fn add_packet_if_forced(&mut self) {
113        if self.force_new_packet {
114            self.add_packet();
115        }
116    }
117
118    /// Reserve exactly the given length in the current packet or a new one if
119    /// such space is not available in the current packet. **Given length must 
120    /// not exceed maximum packet size.**
121    /// 
122    /// This function is currently only used for writing the element's header.
123    fn reserve_exact(&mut self, len: usize) -> &mut [u8] {
124        debug_assert!(len <= PACKET_MAX_BODY_LEN);
125        let new_packet = self.available_len < len;
126        if new_packet {
127            self.add_packet();
128        }
129        let packet = self.packets.last_mut().unwrap();
130        self.available_len -= len;
131        packet.grow(len)
132    }
133
134    /// Reserve up to the given length in the current packet, if no byte is
135    /// available in the current packet, a new packet is created. The final
136    /// reserved length is the size of the returned slice.
137    fn reserve(&mut self, len: usize) -> &mut [u8] {
138        let new_packet = self.available_len == 0;
139        if new_packet {
140            self.add_packet();
141        }
142        let packet = self.packets.last_mut().unwrap();
143        let len = len.min(self.available_len);
144        self.available_len -= len;
145        packet.grow(len)
146    }
147
148}
149
150
151/// An internal writer implementation used to append data to a bundle,
152/// adding packets if needed.
153struct BundleWriter<'a> {
154    bundle: &'a mut Bundle,
155}
156
157impl<'a> BundleWriter<'a> {
158
159    /// Construct a new bundle writer, must be constructed only if at least one packet
160    /// is already existing in the bundle.
161    fn new(bundle: &'a mut Bundle) -> Self {
162        Self { bundle }
163    }
164
165}
166
167impl<'a> Write for BundleWriter<'a> {
168
169    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
170        let slice = self.bundle.reserve(buf.len());
171        slice.copy_from_slice(&buf[..slice.len()]);
172        Ok(slice.len())
173    }
174
175    fn flush(&mut self) -> io::Result<()> {
176        Ok(())
177    }
178
179}
180
181
182/// A simple reader for bundle that join all packet's bodies into
183/// a single stream. This is internally used by [`BundleElementReader`] 
184/// for reading elements and replies.
185/// 
186/// *Note that it implements clone in order to save advancement of
187/// the reader and allowing rollbacks.*
188#[derive(Clone)]
189struct BundleReader<'a> {
190    /// The current packet with the remaining ones.
191    packets: &'a [Box<Packet>],
192    /// The remaining body data in the current packet.
193    body: &'a [u8],
194    /// The current position of the reader, used for requests.
195    pos: usize,
196}
197
198impl<'a> BundleReader<'a> {
199
200    fn new(bundle: &'a Bundle) -> Self {
201        let packets = bundle.packets();
202        Self {
203            packets,
204            body: packets.get(0)
205                .map(|p| p.content())
206                .unwrap_or(&[]),
207            pos: 0,
208        }
209    }
210
211    /// Internal function to get a reference to the current packet.
212    fn packet(&self) -> Option<&'a Packet> {
213        self.packets.get(0).map(|b| &**b)
214    }
215
216    /// Internal function that ensures that the body is not empty.
217    /// If empty, it search for the next non-empty packet and return.
218    /// 
219    /// It returns true if the operation was successful, false otherwise.
220    fn ensure(&mut self) -> bool {
221        while self.body.is_empty() {
222            if self.packets.is_empty() {
223                return false; // No more data.
224            } else {
225                // Discard the current packet from the slice.
226                self.packets = &self.packets[1..];
227                // And if there is one packet, set the body from this packet.
228                if let Some(p) = self.packets.get(0) {
229                    self.body = p.content();
230                }
231            }
232        }
233        true
234    }
235
236    /// Internal function to goto a given position in the bundle.
237    /// 
238    /// *The given position is checked to be past the current one.*
239    fn goto(&mut self, pos: usize) {
240        assert!(pos >= self.pos, "given pos is lower than current pos");
241        let mut remaining = pos - self.pos;
242        while remaining != 0 && self.ensure() {
243            let len = self.body.len().min(remaining);
244            self.pos += len;
245            remaining -= len;
246        }
247    }
248
249}
250
251impl<'a> Read for BundleReader<'a> {
252
253    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
254
255        if !self.ensure() {
256            return Ok(0);
257        }
258
259        let len = buf.len().min(self.body.len());
260        buf[..len].copy_from_slice(&self.body[..len]);
261
262        self.body = &self.body[len..];
263        self.pos += len;
264
265        Ok(len)
266
267    }
268
269}
270
271impl fmt::Debug for BundleReader<'_> {
272    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273        f.debug_struct("BundleReader")
274            .field("packets", &self.packets)
275            .field("body", &format_args!("{:X}", BytesFmt(self.body)))
276            .field("pos", &self.pos)
277            .finish()
278    }
279}
280
281
282/// The full description of an element being read or to be written.
283/// Including its numeric identifier (0xFF if reply), the element
284/// itself and the optional request id.
285#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct BundleElement<E> {
287    /// Numeric identifier of the element.
288    pub id: u8,
289    /// The actual element.
290    /// TODO: Rename to "payload"
291    pub element: E,
292    /// The request ID if the element is a request. Not to be confused 
293    /// with the reply ID if the element is a `Reply`.
294    pub request_id: Option<u32>
295}
296
297impl<E> BundleElement<E> {
298
299    /// Map this read element's type into another one with the given 
300    /// closure.
301    pub fn map<U, F: FnOnce(E) -> U>(self, f: F) -> BundleElement<U> {
302        BundleElement { 
303            id: self.id, 
304            element: f(self.element), 
305            request_id: self.request_id
306        }
307    }
308
309}
310
311impl<E: Element> From<BundleElement<Reply<E>>> for BundleElement<E> {
312    fn from(read: BundleElement<Reply<E>>) -> Self {
313        BundleElement {
314            id: REPLY_ID,
315            element: read.element.element,
316            request_id: read.request_id
317        }
318    }
319}
320
321
322/// The structure used to write elements to a bundle. This structure
323pub struct BundleElementWriter<'a> {
324    bundle: &'a mut Bundle,
325}
326
327impl<'a> BundleElementWriter<'a> {
328
329    fn new(bundle: &'a mut Bundle) -> Self {
330        Self {
331            bundle,
332        }
333    }
334
335    /// Add an element to this bundle.
336    #[inline]
337    pub fn write<E: TopElement>(&mut self, id: u8, element: E, config: &E::Config) {
338        self.write_raw(BundleElement { id, element, request_id: None }, config)
339    }
340
341    /// Add a simple element to this bundle. Such elements have no config.
342    #[inline]
343    pub fn write_simple<E: TopElement<Config = ()>>(&mut self, id: u8, element: E) {
344        self.write(id, element, &())
345    }
346
347    /// Add a request element to this bundle, with a given request ID.
348    #[inline]
349    pub fn write_request<E: TopElement>(&mut self, id: u8, element: E, config: &E::Config, request_id: u32) {
350        self.write_raw(BundleElement { id, element, request_id: Some(request_id) }, config)
351    }
352
353    /// Add a request element to this bundle, with a given request ID. 
354    /// Such elements have no config.
355    #[inline]
356    pub fn write_simple_request<E: TopElement<Config = ()>>(&mut self, id: u8, element: E, request_id: u32) {
357        self.write_request(id, element, &(), request_id)
358    }
359
360    /// Add a reply element to this bundle, for a given request ID.
361    /// 
362    /// Such elements are special and don't require an ID, because they 
363    /// are always of  a 32-bit variable length and prefixed with the 
364    /// request ID.
365    #[inline]
366    pub fn write_reply<E: Element>(&mut self, element: E, config: &E::Config, request_id: u32) {
367        self.write(REPLY_ID, Reply::new(request_id, element), config)
368    }
369
370    /// Add a reply element to this bundle, for a given request ID.
371    /// Such elements have no config.
372    #[inline]
373    pub fn write_simple_reply<E: Element<Config = ()>>(&mut self, element: E, request_id: u32) {
374        self.write_reply(element, &(), request_id)
375    }
376
377    /// Raw method to add an element to this bundle, given an ID, the 
378    /// element and its config. With an optional request ID.
379    pub fn write_raw<E: TopElement>(&mut self, element: BundleElement<E>, config: &E::Config) {
380
381        self.bundle.add_packet_if_forced();
382
383        const REQUEST_HEADER_LEN: usize = 6;
384
385        // Allocate element's header, +1 for element's ID, +6 reply_id and link offset.
386        let header_len = 1 + E::LEN.len() + if element.request_id.is_some() { REQUEST_HEADER_LEN } else { 0 };
387        let header_slice = self.bundle.reserve_exact(header_len);
388        header_slice[0] = element.id;
389
390        if let Some(request_id) = element.request_id {
391            let mut request_header_cursor = Cursor::new(&mut header_slice[header_len - 6..]);
392            request_header_cursor.write_u32(request_id).unwrap();
393            request_header_cursor.write_u16(0).unwrap(); // Next request offset set to null.
394        }
395
396        // Keep the packet index to rewrite the packet's length after writing it.
397        let cur_packet_idx = self.bundle.packets.len() - 1;
398
399        // IMPORTANT: All offsets are in the content, not the raw body or raw data.
400        let cur_packet = &mut self.bundle.packets[cur_packet_idx];
401        let cur_packet_len = cur_packet.content_len();
402        let cur_packet_elt_offset = cur_packet_len - header_len;
403
404        // NOTE: We add flags length to element offset because offset contains flags.
405        if element.request_id.is_some() {
406        
407            if self.bundle.last_request_header_offset == 0 {
408                // If there is no previous request, we set the first request offset.
409                cur_packet.set_first_request_offset(PACKET_FLAGS_LEN + cur_packet_elt_offset);
410            } else {
411                // Add 4 because first 4 bytes is the request id.
412                Cursor::new(&mut cur_packet.content_mut()[self.bundle.last_request_header_offset + 4..])
413                    .write_u16((PACKET_FLAGS_LEN + cur_packet_elt_offset) as u16).unwrap();
414            }
415
416            // We keep the offset of the request header, it will be used if a request
417            // element is added after this one so we can write the link to the next.
418            self.bundle.last_request_header_offset = cur_packet_len - REQUEST_HEADER_LEN;
419            
420        }
421
422        // Write the actual element's content.
423        let mut writer = IoCounter::new(BundleWriter::new(&mut *self.bundle));
424        // For now we just unwrap the encode result, because no IO error should be produced by a BundleWriter.
425        element.element.encode(&mut writer, config).unwrap();
426        let length = writer.count() as u32;
427
428        // Finally write id and length, we can unwrap because we know that enough length is available.
429        let header_slice = &mut self.bundle.packets[cur_packet_idx].content_mut()[cur_packet_elt_offset..];
430        E::LEN.write(Cursor::new(&mut header_slice[1..]), length).unwrap();
431
432    }
433
434}
435
436
437/// The structure used to iterate over a bundle's elements, providing
438/// a developer-friendly API that automatically handle reply elements.
439/// 
440/// This structure can be obtained from [`Bundle::element_reader`].
441pub struct BundleElementReader<'a> {
442    bundle_reader: BundleReader<'a>,
443    next_request_offset: usize
444}
445
446impl<'a> BundleElementReader<'a> {
447
448    /// Internal constructor used by [`Bundle`] to create the reader.
449    fn new(bundle: &'a Bundle) -> Self {
450        let bundle_reader = BundleReader::new(bundle);
451        Self {
452            next_request_offset: bundle_reader.packet()
453                .map(|p| p.first_request_offset().unwrap_or(0))
454                .unwrap_or(0),
455            bundle_reader
456        }
457    }
458
459    /// Return `true` if the current element is a request, this is just dependent of
460    /// the current position within the current packet.
461    pub fn is_request(&self) -> bool {
462        // Get the real data pos (instead of the body pos).
463        let data_pos = self.bundle_reader.pos + PACKET_FLAGS_LEN;
464        self.next_request_offset != 0 && data_pos == self.next_request_offset
465    }
466
467    /// Read the current element's identifier. This call return the same result until
468    /// you explicitly choose to go to the next element while reading the element
469    pub fn next_id(&self) -> Option<u8> {
470        self.bundle_reader.body.get(0).copied()
471    }
472
473    /// Read the current element, return a guard that you should use a codec to decode
474    /// the element depending on its type with. *This is a simpler version to use over
475    /// standard `read_element` method because it handle reply elements for you.*
476    pub fn next_element(&mut self) -> Option<ElementReader<'_, 'a>> {
477        match self.next_id() {
478            Some(REPLY_ID) => {
479                match self.read_element::<ReplyHeader>(&(), false) {
480                    Ok(elt) => {
481                        debug_assert!(elt.request_id.is_none(), "Replies should not be request at the same time.");
482                        Some(ElementReader::Reply(ReplyElementReader(self, elt.element.request_id)))
483                    }
484                    Err(_) => None
485                }
486            }
487            Some(id) => {
488                Some(ElementReader::Top(TopElementReader(self, id)))
489            }
490            None => None
491        }
492    }
493
494    /// Try to decode the current element using a given codec. You can choose to go
495    /// to the next element using the `next` argument.
496    pub fn read_element<E>(&mut self, config: &E::Config, next: bool) -> BundleResult<BundleElement<E>>
497    where
498        E: TopElement
499    {
500
501        let request = self.is_request();
502        let header_len = E::LEN.len() + 1 + if request { 6 } else { 0 };
503
504        if self.bundle_reader.body.len() < header_len {
505            return Err(BundleError::TooShort)
506        }
507
508        // We store a screenshot of the reader in order to be able to rollback in case of error.
509        let reader_save = self.bundle_reader.clone();
510
511        match self.read_element_internal::<E>(config, next, request) {
512            Ok(elt) if next => Ok(elt),
513            Ok(elt) => {
514                // If no error but we don't want to go next.
515                self.bundle_reader.clone_from(&reader_save);
516                Ok(elt)
517            }
518            Err(e) => {
519                // If any error happens, we cancel the operation.
520                self.bundle_reader.clone_from(&reader_save);
521                Err(BundleError::Io(e))
522            }
523        }
524
525    }
526
527    /// Internal only. Used by `next` to wrap all IO errors and reset seek if an error happens.
528    #[inline(always)]
529    fn read_element_internal<E>(&mut self, config: &E::Config, next: bool, request: bool) -> io::Result<BundleElement<E>>
530    where
531        E: TopElement
532    {
533
534        let start_packet = self.bundle_reader.packet().unwrap();
535
536        let elt_id = self.bundle_reader.read_u8()?;
537        let elt_len = E::LEN.read(&mut self.bundle_reader, elt_id)?;
538
539        let reply_id = if request {
540            let reply_id = self.bundle_reader.read_u32()?;
541            self.next_request_offset = self.bundle_reader.read_u16()? as usize;
542            Some(reply_id)
543        } else {
544            None
545        };
546
547        let elt_data_begin = self.bundle_reader.pos;
548
549        let mut elt_reader = Read::take(&mut self.bundle_reader, elt_len as u64);
550        let element = E::decode(&mut elt_reader, elt_len as usize, config)?;
551
552        // We seek to the end only if we want to go next.
553        if next {
554
555            self.bundle_reader.goto(elt_data_begin + elt_len as usize);
556
557            // Here we check if we have changed packets during decoding of the element.
558            // If changed, we change the next request offset.
559            match self.bundle_reader.packet() {
560                Some(end_packet) => {
561                    if !std::ptr::eq(start_packet, end_packet) {
562                        self.next_request_offset = end_packet.first_request_offset().unwrap_or(0);
563                    }
564                    // Else, we are still in the same packet so we don't need to change this.
565                }
566                None => self.next_request_offset = 0
567            }
568
569        }
570
571        Ok(BundleElement {
572            id: elt_id,
573            element,
574            request_id: reply_id
575        })
576
577    }
578
579}
580
581impl fmt::Debug for BundleElementReader<'_> {
582    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583        f.debug_struct("BundleElementReader")
584            .field("bundle_reader", &self.bundle_reader)
585            .field("next_request_offset", &self.next_request_offset)
586            .field("next_id()", &self.next_id())
587            .field("is_request()", &self.is_request())
588            .finish()
589    }
590}
591
592/// Bundle element variant iterated from `BundleElementIter`.
593/// This enum provides a better way to read replies using sub codecs.
594#[derive(Debug)]
595pub enum ElementReader<'reader, 'bundle> {
596    /// A top element with a proper ID and a reader.
597    Top(TopElementReader<'reader, 'bundle>),
598    /// A reply element with request ID and a reader.
599    Reply(ReplyElementReader<'reader, 'bundle>)
600}
601
602impl ElementReader<'_, '_> {
603
604    /// Return `true` if this element is a simple one.
605    pub fn is_simple(&self) -> bool {
606        matches!(self, ElementReader::Top(_))
607    }
608
609    /// Return `true` if this element is a reply.
610    pub fn is_reply(&self) -> bool {
611        matches!(self, ElementReader::Reply(_))
612    }
613
614}
615
616/// The simple variant of element, provides direct decoding using a codec.
617#[derive(Debug)]
618pub struct TopElementReader<'reader, 'bundle>(&'reader mut BundleElementReader<'bundle>, u8);
619
620impl TopElementReader<'_, '_> {
621
622    /// Get the numeric identifier of the element being read.
623    #[inline]
624    pub fn id(&self) -> u8 {
625        self.1
626    }
627
628    /// Same as `read` but never go to the next element *(this is why this method doesn't take
629    /// self by value)*.
630    pub fn read_stable<E: TopElement>(&mut self, config: &E::Config) -> BundleResult<BundleElement<E>> {
631        self.0.read_element(config, false)
632    }
633
634    #[inline]
635    pub fn read_simple_stable<E: TopElement<Config = ()>>(&mut self) -> BundleResult<BundleElement<E>> {
636        self.read_stable::<E>(&())
637    }
638
639    /// Read the element using the given codec. This method take self by value and automatically
640    /// go the next element if read is successful, if not successful you will need to call
641    /// `Bundle::next_element` again.
642    pub fn read<E: TopElement>(self, config: &E::Config) -> BundleResult<BundleElement<E>> {
643        self.0.read_element(config, true)
644    }
645
646    #[inline]
647    pub fn read_simple<E: TopElement<Config = ()>>(self) -> BundleResult<BundleElement<E>> {
648        self.read::<E>(&())
649    }
650
651}
652
653/// The reply variant of element, provides a way to read replies and get `Reply` elements
654/// containing the final element.
655#[derive(Debug)]
656pub struct ReplyElementReader<'reader, 'bundle>(&'reader mut BundleElementReader<'bundle>, u32);
657
658impl<'reader, 'bundle> ReplyElementReader<'reader, 'bundle> {
659
660    /// Get the request id this reply is for.
661    #[inline]
662    pub fn request_id(&self) -> u32 {
663        self.1
664    }
665
666    /// Same as `read` but never go to the next element *(this is why this method doesn't take
667    /// self by value)*.
668    ///
669    /// This method doesn't returns the reply element but the final element.
670    pub fn read_stable<E: Element>(&mut self, config: &E::Config) -> BundleResult<BundleElement<E>> {
671        self.0.read_element::<Reply<E>>(config, false).map(Into::into)
672    }
673
674    #[inline]
675    pub fn read_simple_stable<E: Element<Config = ()>>(&mut self) -> BundleResult<BundleElement<E>> {
676        self.read_stable::<E>(&())
677    }
678
679    /// Read the reply element using the given codec. This method take self by value and
680    /// automatically go the next element if read is successful, if not successful you
681    /// will need to call `Bundle::next_element` again.
682    ///
683    /// This method doesn't returns the reply element but the final element.
684    pub fn read<E: Element>(self, config: &E::Config) -> BundleResult<BundleElement<E>> {
685        self.0.read_element::<Reply<E>>(config, true).map(Into::into)
686    }
687
688    #[inline]
689    pub fn read_simple<E: Element<Config = ()>>(self) -> BundleResult<BundleElement<E>> {
690        self.read::<E>(&())
691    }
692
693}
694
695
696/// Standard errors that can happen while interacting with bundles.
697#[derive(Debug, Error)]
698pub enum BundleError {
699    #[error("bundle is too short for reading element")]
700    TooShort,
701    /// IO error while interacting with the packet.
702    #[error("io error: {0}")]
703    Io(#[from] io::Error),
704}
705
706/// Common alias for standard bundle errors [`BundleError`].
707pub type BundleResult<T> = Result<T, BundleError>;