binary_cookies/sync/
stream.rs1use std::{io::Read, mem};
2
3use oval::Buffer;
4use snafu::ResultExt;
5
6use crate::{
7 decode::{
8 binary_cookies::BinaryCookieFsm,
9 cookies::CookieFsm,
10 meta::MetaFsm,
11 pages::PageFsm,
12 stream::{State, Values},
13 DecodeResult,
14 },
15 error::{self, Result},
16};
17
18#[derive(Clone)]
19#[derive(Debug)]
20pub struct StreamDecoder<R: Read> {
21 state: State,
22 rd: R,
23}
24
25impl<R: Read> StreamDecoder<R> {
26 const BUF_SIZE: usize = 64;
27
28 pub fn new(rd: R) -> Self {
29 Self {
30 state: State::Bc {
31 fsm: BinaryCookieFsm {
32 buffer: Buffer::with_capacity(Self::BUF_SIZE),
33 },
34 },
35 rd,
36 }
37 }
38
39 pub fn decode(&mut self) -> Result<Values> {
40 match mem::take(&mut self.state) {
41 State::Bc { mut fsm } => loop {
42 let readed = self
43 .rd
44 .read(fsm.buffer.space())
45 .context(error::ReadSnafu)?;
46 fsm.buffer.fill(readed);
47
48 match fsm.process()? {
49 DecodeResult::Continue(fsm_) => {
50 fsm = fsm_;
51 continue;
52 },
53 DecodeResult::Done((meta_offset, pages_offset, buffer)) => {
54 self.state = State::Page {
55 fsm: PageFsm { buffer },
56 remaining_page: pages_offset.offset_sizes.len() as u32,
57 };
58 return Ok(Values::Bc { meta_offset, pages_offset });
59 },
60 }
61 },
62 State::Page { mut fsm, remaining_page } => loop {
63 let readed = self
64 .rd
65 .read(fsm.buffer.space())
66 .context(error::ReadSnafu)?;
67 fsm.buffer.fill(readed);
68 match fsm.process()? {
69 DecodeResult::Continue(fsm_) => {
70 fsm = fsm_;
71 continue;
72 },
73 DecodeResult::Done((c, buffer)) => {
74 self.state = State::Cookie {
75 fsm: CookieFsm { buffer },
76 remaining_cookie: c.len() as u32,
77 remaining_page: remaining_page - 1,
78 };
79 return Ok(Values::Page(c));
80 },
81 }
82 },
83 State::Cookie {
84 remaining_cookie,
85 remaining_page,
86 mut fsm,
87 } => loop {
88 let readed = self
89 .rd
90 .read(fsm.buffer.space())
91 .context(error::ReadSnafu)?;
92 fsm.buffer.fill(readed);
93
94 match fsm.process()? {
95 DecodeResult::Continue(fsm_) => {
96 fsm = fsm_;
97 continue;
98 },
99 DecodeResult::Done((cookie, buffer)) => {
100 let remaining_cookie = remaining_cookie - 1;
101 match (remaining_cookie, remaining_page) {
102 (0, 0) => self.state = State::Meta { fsm: MetaFsm { buffer } },
103 (0, _) => {
104 self.state = State::Page {
105 fsm: PageFsm { buffer },
106 remaining_page,
107 };
108 },
109 _ => {
110 self.state = State::Cookie {
111 fsm: CookieFsm { buffer },
112 remaining_cookie,
113 remaining_page,
114 };
115 },
116 }
117 return Ok(Values::Cookie(cookie));
118 },
119 }
120 },
121 State::Meta { mut fsm } => loop {
122 let readed = self
123 .rd
124 .read(fsm.buffer.space())
125 .context(error::ReadSnafu)?;
126 fsm.buffer.fill(readed);
127
128 match fsm.process()? {
129 DecodeResult::Continue(fsm_) => {
130 fsm = fsm_;
131 continue;
132 },
133 DecodeResult::Done((checksum, meta)) => {
134 self.state = State::Finished;
135 return Ok(Values::Meta { checksum, meta });
136 },
137 }
138 },
139 State::Finished => Err(error::ParsingCompletedSnafu.build()),
140 State::Transition => unreachable!(),
141 }
142 }
143}