1use std::io::{self, Write, Cursor, Read};
4use std::fmt;
5
6use thiserror::Error;
7
8use super::packet::{Packet, PACKET_FLAGS_LEN, PACKET_MAX_BODY_LEN};
9use super::element::reply::{Reply, ReplyHeader, REPLY_ID};
10use super::element::{Element, TopElement};
11
12use crate::util::io::*;
13use crate::util::BytesFmt;
14
15
16#[derive(Debug)]
24pub struct Bundle {
25 packets: Vec<Box<Packet>>,
27 force_new_packet: bool,
33 available_len: usize,
35 last_request_header_offset: usize,
37}
38
39impl Bundle {
40
41 pub fn new() -> Bundle {
44 Self::with_multiple(vec![])
45 }
46
47 pub fn with_single(packet: Box<Packet>) -> Self {
49 Self::with_multiple(vec![packet])
50 }
51
52 pub fn with_multiple(packets: Vec<Box<Packet>>) -> Self {
54 Self {
55 available_len: packets.last().map(|p| p.content_available_len()).unwrap_or(0),
56 packets,
57 force_new_packet: true,
58 last_request_header_offset: 0,
59 }
60 }
61
62 pub fn element_reader(&self) -> BundleElementReader<'_> {
64 BundleElementReader::new(self)
65 }
66
67 pub fn element_writer(&mut self) -> BundleElementWriter<'_> {
69 BundleElementWriter::new(self)
70 }
71
72 #[inline]
74 pub fn len(&self) -> usize {
75 self.packets.len()
76 }
77
78 #[inline]
79 pub fn is_empty(&self) -> bool {
80 self.packets.is_empty()
81 }
82
83 pub fn clear(&mut self) {
85 self.packets.clear();
86 self.force_new_packet = true;
87 self.available_len = 0;
88 self.last_request_header_offset = 0;
89 }
90
91 #[inline]
93 pub fn packets(&self) -> &[Box<Packet>] {
94 &self.packets[..]
95 }
96
97 #[inline]
98 pub fn packets_mut(&mut self) -> &mut [Box<Packet>] {
99 &mut self.packets[..]
100 }
101
102 fn add_packet(&mut self) {
104 let packet = Packet::new_boxed();
105 self.available_len = packet.content_available_len();
106 self.packets.push(packet);
107 self.last_request_header_offset = 0;
108 self.force_new_packet = false;
109 }
110
111 fn add_packet_if_forced(&mut self) {
113 if self.force_new_packet {
114 self.add_packet();
115 }
116 }
117
118 fn reserve_exact(&mut self, len: usize) -> &mut [u8] {
124 debug_assert!(len <= PACKET_MAX_BODY_LEN);
125 let new_packet = self.available_len < len;
126 if new_packet {
127 self.add_packet();
128 }
129 let packet = self.packets.last_mut().unwrap();
130 self.available_len -= len;
131 packet.grow(len)
132 }
133
134 fn reserve(&mut self, len: usize) -> &mut [u8] {
138 let new_packet = self.available_len == 0;
139 if new_packet {
140 self.add_packet();
141 }
142 let packet = self.packets.last_mut().unwrap();
143 let len = len.min(self.available_len);
144 self.available_len -= len;
145 packet.grow(len)
146 }
147
148}
149
150
151struct BundleWriter<'a> {
154 bundle: &'a mut Bundle,
155}
156
157impl<'a> BundleWriter<'a> {
158
159 fn new(bundle: &'a mut Bundle) -> Self {
162 Self { bundle }
163 }
164
165}
166
167impl<'a> Write for BundleWriter<'a> {
168
169 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
170 let slice = self.bundle.reserve(buf.len());
171 slice.copy_from_slice(&buf[..slice.len()]);
172 Ok(slice.len())
173 }
174
175 fn flush(&mut self) -> io::Result<()> {
176 Ok(())
177 }
178
179}
180
181
182#[derive(Clone)]
189struct BundleReader<'a> {
190 packets: &'a [Box<Packet>],
192 body: &'a [u8],
194 pos: usize,
196}
197
198impl<'a> BundleReader<'a> {
199
200 fn new(bundle: &'a Bundle) -> Self {
201 let packets = bundle.packets();
202 Self {
203 packets,
204 body: packets.get(0)
205 .map(|p| p.content())
206 .unwrap_or(&[]),
207 pos: 0,
208 }
209 }
210
211 fn packet(&self) -> Option<&'a Packet> {
213 self.packets.get(0).map(|b| &**b)
214 }
215
216 fn ensure(&mut self) -> bool {
221 while self.body.is_empty() {
222 if self.packets.is_empty() {
223 return false; } else {
225 self.packets = &self.packets[1..];
227 if let Some(p) = self.packets.get(0) {
229 self.body = p.content();
230 }
231 }
232 }
233 true
234 }
235
236 fn goto(&mut self, pos: usize) {
240 assert!(pos >= self.pos, "given pos is lower than current pos");
241 let mut remaining = pos - self.pos;
242 while remaining != 0 && self.ensure() {
243 let len = self.body.len().min(remaining);
244 self.pos += len;
245 remaining -= len;
246 }
247 }
248
249}
250
251impl<'a> Read for BundleReader<'a> {
252
253 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
254
255 if !self.ensure() {
256 return Ok(0);
257 }
258
259 let len = buf.len().min(self.body.len());
260 buf[..len].copy_from_slice(&self.body[..len]);
261
262 self.body = &self.body[len..];
263 self.pos += len;
264
265 Ok(len)
266
267 }
268
269}
270
271impl fmt::Debug for BundleReader<'_> {
272 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
273 f.debug_struct("BundleReader")
274 .field("packets", &self.packets)
275 .field("body", &format_args!("{:X}", BytesFmt(self.body)))
276 .field("pos", &self.pos)
277 .finish()
278 }
279}
280
281
282#[derive(Debug, Clone, PartialEq, Eq)]
286pub struct BundleElement<E> {
287 pub id: u8,
289 pub element: E,
292 pub request_id: Option<u32>
295}
296
297impl<E> BundleElement<E> {
298
299 pub fn map<U, F: FnOnce(E) -> U>(self, f: F) -> BundleElement<U> {
302 BundleElement {
303 id: self.id,
304 element: f(self.element),
305 request_id: self.request_id
306 }
307 }
308
309}
310
311impl<E: Element> From<BundleElement<Reply<E>>> for BundleElement<E> {
312 fn from(read: BundleElement<Reply<E>>) -> Self {
313 BundleElement {
314 id: REPLY_ID,
315 element: read.element.element,
316 request_id: read.request_id
317 }
318 }
319}
320
321
322pub struct BundleElementWriter<'a> {
324 bundle: &'a mut Bundle,
325}
326
327impl<'a> BundleElementWriter<'a> {
328
329 fn new(bundle: &'a mut Bundle) -> Self {
330 Self {
331 bundle,
332 }
333 }
334
335 #[inline]
337 pub fn write<E: TopElement>(&mut self, id: u8, element: E, config: &E::Config) {
338 self.write_raw(BundleElement { id, element, request_id: None }, config)
339 }
340
341 #[inline]
343 pub fn write_simple<E: TopElement<Config = ()>>(&mut self, id: u8, element: E) {
344 self.write(id, element, &())
345 }
346
347 #[inline]
349 pub fn write_request<E: TopElement>(&mut self, id: u8, element: E, config: &E::Config, request_id: u32) {
350 self.write_raw(BundleElement { id, element, request_id: Some(request_id) }, config)
351 }
352
353 #[inline]
356 pub fn write_simple_request<E: TopElement<Config = ()>>(&mut self, id: u8, element: E, request_id: u32) {
357 self.write_request(id, element, &(), request_id)
358 }
359
360 #[inline]
366 pub fn write_reply<E: Element>(&mut self, element: E, config: &E::Config, request_id: u32) {
367 self.write(REPLY_ID, Reply::new(request_id, element), config)
368 }
369
370 #[inline]
373 pub fn write_simple_reply<E: Element<Config = ()>>(&mut self, element: E, request_id: u32) {
374 self.write_reply(element, &(), request_id)
375 }
376
377 pub fn write_raw<E: TopElement>(&mut self, element: BundleElement<E>, config: &E::Config) {
380
381 self.bundle.add_packet_if_forced();
382
383 const REQUEST_HEADER_LEN: usize = 6;
384
385 let header_len = 1 + E::LEN.len() + if element.request_id.is_some() { REQUEST_HEADER_LEN } else { 0 };
387 let header_slice = self.bundle.reserve_exact(header_len);
388 header_slice[0] = element.id;
389
390 if let Some(request_id) = element.request_id {
391 let mut request_header_cursor = Cursor::new(&mut header_slice[header_len - 6..]);
392 request_header_cursor.write_u32(request_id).unwrap();
393 request_header_cursor.write_u16(0).unwrap(); }
395
396 let cur_packet_idx = self.bundle.packets.len() - 1;
398
399 let cur_packet = &mut self.bundle.packets[cur_packet_idx];
401 let cur_packet_len = cur_packet.content_len();
402 let cur_packet_elt_offset = cur_packet_len - header_len;
403
404 if element.request_id.is_some() {
406
407 if self.bundle.last_request_header_offset == 0 {
408 cur_packet.set_first_request_offset(PACKET_FLAGS_LEN + cur_packet_elt_offset);
410 } else {
411 Cursor::new(&mut cur_packet.content_mut()[self.bundle.last_request_header_offset + 4..])
413 .write_u16((PACKET_FLAGS_LEN + cur_packet_elt_offset) as u16).unwrap();
414 }
415
416 self.bundle.last_request_header_offset = cur_packet_len - REQUEST_HEADER_LEN;
419
420 }
421
422 let mut writer = IoCounter::new(BundleWriter::new(&mut *self.bundle));
424 element.element.encode(&mut writer, config).unwrap();
426 let length = writer.count() as u32;
427
428 let header_slice = &mut self.bundle.packets[cur_packet_idx].content_mut()[cur_packet_elt_offset..];
430 E::LEN.write(Cursor::new(&mut header_slice[1..]), length).unwrap();
431
432 }
433
434}
435
436
437pub struct BundleElementReader<'a> {
442 bundle_reader: BundleReader<'a>,
443 next_request_offset: usize
444}
445
446impl<'a> BundleElementReader<'a> {
447
448 fn new(bundle: &'a Bundle) -> Self {
450 let bundle_reader = BundleReader::new(bundle);
451 Self {
452 next_request_offset: bundle_reader.packet()
453 .map(|p| p.first_request_offset().unwrap_or(0))
454 .unwrap_or(0),
455 bundle_reader
456 }
457 }
458
459 pub fn is_request(&self) -> bool {
462 let data_pos = self.bundle_reader.pos + PACKET_FLAGS_LEN;
464 self.next_request_offset != 0 && data_pos == self.next_request_offset
465 }
466
467 pub fn next_id(&self) -> Option<u8> {
470 self.bundle_reader.body.get(0).copied()
471 }
472
473 pub fn next_element(&mut self) -> Option<ElementReader<'_, 'a>> {
477 match self.next_id() {
478 Some(REPLY_ID) => {
479 match self.read_element::<ReplyHeader>(&(), false) {
480 Ok(elt) => {
481 debug_assert!(elt.request_id.is_none(), "Replies should not be request at the same time.");
482 Some(ElementReader::Reply(ReplyElementReader(self, elt.element.request_id)))
483 }
484 Err(_) => None
485 }
486 }
487 Some(id) => {
488 Some(ElementReader::Top(TopElementReader(self, id)))
489 }
490 None => None
491 }
492 }
493
494 pub fn read_element<E>(&mut self, config: &E::Config, next: bool) -> BundleResult<BundleElement<E>>
497 where
498 E: TopElement
499 {
500
501 let request = self.is_request();
502 let header_len = E::LEN.len() + 1 + if request { 6 } else { 0 };
503
504 if self.bundle_reader.body.len() < header_len {
505 return Err(BundleError::TooShort)
506 }
507
508 let reader_save = self.bundle_reader.clone();
510
511 match self.read_element_internal::<E>(config, next, request) {
512 Ok(elt) if next => Ok(elt),
513 Ok(elt) => {
514 self.bundle_reader.clone_from(&reader_save);
516 Ok(elt)
517 }
518 Err(e) => {
519 self.bundle_reader.clone_from(&reader_save);
521 Err(BundleError::Io(e))
522 }
523 }
524
525 }
526
527 #[inline(always)]
529 fn read_element_internal<E>(&mut self, config: &E::Config, next: bool, request: bool) -> io::Result<BundleElement<E>>
530 where
531 E: TopElement
532 {
533
534 let start_packet = self.bundle_reader.packet().unwrap();
535
536 let elt_id = self.bundle_reader.read_u8()?;
537 let elt_len = E::LEN.read(&mut self.bundle_reader, elt_id)?;
538
539 let reply_id = if request {
540 let reply_id = self.bundle_reader.read_u32()?;
541 self.next_request_offset = self.bundle_reader.read_u16()? as usize;
542 Some(reply_id)
543 } else {
544 None
545 };
546
547 let elt_data_begin = self.bundle_reader.pos;
548
549 let mut elt_reader = Read::take(&mut self.bundle_reader, elt_len as u64);
550 let element = E::decode(&mut elt_reader, elt_len as usize, config)?;
551
552 if next {
554
555 self.bundle_reader.goto(elt_data_begin + elt_len as usize);
556
557 match self.bundle_reader.packet() {
560 Some(end_packet) => {
561 if !std::ptr::eq(start_packet, end_packet) {
562 self.next_request_offset = end_packet.first_request_offset().unwrap_or(0);
563 }
564 }
566 None => self.next_request_offset = 0
567 }
568
569 }
570
571 Ok(BundleElement {
572 id: elt_id,
573 element,
574 request_id: reply_id
575 })
576
577 }
578
579}
580
581impl fmt::Debug for BundleElementReader<'_> {
582 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
583 f.debug_struct("BundleElementReader")
584 .field("bundle_reader", &self.bundle_reader)
585 .field("next_request_offset", &self.next_request_offset)
586 .field("next_id()", &self.next_id())
587 .field("is_request()", &self.is_request())
588 .finish()
589 }
590}
591
592#[derive(Debug)]
595pub enum ElementReader<'reader, 'bundle> {
596 Top(TopElementReader<'reader, 'bundle>),
598 Reply(ReplyElementReader<'reader, 'bundle>)
600}
601
602impl ElementReader<'_, '_> {
603
604 pub fn is_simple(&self) -> bool {
606 matches!(self, ElementReader::Top(_))
607 }
608
609 pub fn is_reply(&self) -> bool {
611 matches!(self, ElementReader::Reply(_))
612 }
613
614}
615
616#[derive(Debug)]
618pub struct TopElementReader<'reader, 'bundle>(&'reader mut BundleElementReader<'bundle>, u8);
619
620impl TopElementReader<'_, '_> {
621
622 #[inline]
624 pub fn id(&self) -> u8 {
625 self.1
626 }
627
628 pub fn read_stable<E: TopElement>(&mut self, config: &E::Config) -> BundleResult<BundleElement<E>> {
631 self.0.read_element(config, false)
632 }
633
634 #[inline]
635 pub fn read_simple_stable<E: TopElement<Config = ()>>(&mut self) -> BundleResult<BundleElement<E>> {
636 self.read_stable::<E>(&())
637 }
638
639 pub fn read<E: TopElement>(self, config: &E::Config) -> BundleResult<BundleElement<E>> {
643 self.0.read_element(config, true)
644 }
645
646 #[inline]
647 pub fn read_simple<E: TopElement<Config = ()>>(self) -> BundleResult<BundleElement<E>> {
648 self.read::<E>(&())
649 }
650
651}
652
653#[derive(Debug)]
656pub struct ReplyElementReader<'reader, 'bundle>(&'reader mut BundleElementReader<'bundle>, u32);
657
658impl<'reader, 'bundle> ReplyElementReader<'reader, 'bundle> {
659
660 #[inline]
662 pub fn request_id(&self) -> u32 {
663 self.1
664 }
665
666 pub fn read_stable<E: Element>(&mut self, config: &E::Config) -> BundleResult<BundleElement<E>> {
671 self.0.read_element::<Reply<E>>(config, false).map(Into::into)
672 }
673
674 #[inline]
675 pub fn read_simple_stable<E: Element<Config = ()>>(&mut self) -> BundleResult<BundleElement<E>> {
676 self.read_stable::<E>(&())
677 }
678
679 pub fn read<E: Element>(self, config: &E::Config) -> BundleResult<BundleElement<E>> {
685 self.0.read_element::<Reply<E>>(config, true).map(Into::into)
686 }
687
688 #[inline]
689 pub fn read_simple<E: Element<Config = ()>>(self) -> BundleResult<BundleElement<E>> {
690 self.read::<E>(&())
691 }
692
693}
694
695
696#[derive(Debug, Error)]
698pub enum BundleError {
699 #[error("bundle is too short for reading element")]
700 TooShort,
701 #[error("io error: {0}")]
703 Io(#[from] io::Error),
704}
705
706pub type BundleResult<T> = Result<T, BundleError>;