binary_cookies/sync/
stream.rs

1use std::{io::Read, mem};
2
3use oval::Buffer;
4use snafu::ResultExt;
5
6use crate::{
7    decode::{
8        binary_cookies::BinaryCookieFsm,
9        cookies::CookieFsm,
10        meta::MetaFsm,
11        pages::PageFsm,
12        stream::{State, Values},
13        DecodeResult,
14    },
15    error::{self, Result},
16};
17
18#[derive(Clone)]
19#[derive(Debug)]
20pub struct StreamDecoder<R: Read> {
21    state: State,
22    rd: R,
23}
24
25impl<R: Read> StreamDecoder<R> {
26    const BUF_SIZE: usize = 64;
27
28    pub fn new(rd: R) -> Self {
29        Self {
30            state: State::Bc {
31                fsm: BinaryCookieFsm {
32                    buffer: Buffer::with_capacity(Self::BUF_SIZE),
33                },
34            },
35            rd,
36        }
37    }
38
39    pub fn decode(&mut self) -> Result<Values> {
40        match mem::take(&mut self.state) {
41            State::Bc { mut fsm } => loop {
42                let readed = self
43                    .rd
44                    .read(fsm.buffer.space())
45                    .context(error::ReadSnafu)?;
46                fsm.buffer.fill(readed);
47
48                match fsm.process()? {
49                    DecodeResult::Continue(fsm_) => {
50                        fsm = fsm_;
51                        continue;
52                    },
53                    DecodeResult::Done((meta_offset, pages_offset, buffer)) => {
54                        self.state = State::Page {
55                            fsm: PageFsm { buffer },
56                            remaining_page: pages_offset.offset_sizes.len() as u32,
57                        };
58                        return Ok(Values::Bc { meta_offset, pages_offset });
59                    },
60                }
61            },
62            State::Page { mut fsm, remaining_page } => loop {
63                let readed = self
64                    .rd
65                    .read(fsm.buffer.space())
66                    .context(error::ReadSnafu)?;
67                fsm.buffer.fill(readed);
68                match fsm.process()? {
69                    DecodeResult::Continue(fsm_) => {
70                        fsm = fsm_;
71                        continue;
72                    },
73                    DecodeResult::Done((c, buffer)) => {
74                        self.state = State::Cookie {
75                            fsm: CookieFsm { buffer },
76                            remaining_cookie: c.len() as u32,
77                            remaining_page: remaining_page - 1,
78                        };
79                        return Ok(Values::Page(c));
80                    },
81                }
82            },
83            State::Cookie {
84                remaining_cookie,
85                remaining_page,
86                mut fsm,
87            } => loop {
88                let readed = self
89                    .rd
90                    .read(fsm.buffer.space())
91                    .context(error::ReadSnafu)?;
92                fsm.buffer.fill(readed);
93
94                match fsm.process()? {
95                    DecodeResult::Continue(fsm_) => {
96                        fsm = fsm_;
97                        continue;
98                    },
99                    DecodeResult::Done((cookie, buffer)) => {
100                        let remaining_cookie = remaining_cookie - 1;
101                        match (remaining_cookie, remaining_page) {
102                            (0, 0) => self.state = State::Meta { fsm: MetaFsm { buffer } },
103                            (0, _) => {
104                                self.state = State::Page {
105                                    fsm: PageFsm { buffer },
106                                    remaining_page,
107                                };
108                            },
109                            _ => {
110                                self.state = State::Cookie {
111                                    fsm: CookieFsm { buffer },
112                                    remaining_cookie,
113                                    remaining_page,
114                                };
115                            },
116                        }
117                        return Ok(Values::Cookie(cookie));
118                    },
119                }
120            },
121            State::Meta { mut fsm } => loop {
122                let readed = self
123                    .rd
124                    .read(fsm.buffer.space())
125                    .context(error::ReadSnafu)?;
126                fsm.buffer.fill(readed);
127
128                match fsm.process()? {
129                    DecodeResult::Continue(fsm_) => {
130                        fsm = fsm_;
131                        continue;
132                    },
133                    DecodeResult::Done((checksum, meta)) => {
134                        self.state = State::Finished;
135                        return Ok(Values::Meta { checksum, meta });
136                    },
137                }
138            },
139            State::Finished => Err(error::ParsingCompletedSnafu.build()),
140            State::Transition => unreachable!(),
141        }
142    }
143}