loro_preload/
encode.rs

1use bytes::{BufMut, BytesMut};
2use loro_common::{ContainerID, InternalString, LoroError, LoroResult, LoroValue, ID};
3use serde_columnar::{columnar, to_vec, ColumnarError};
4use std::borrow::Cow;
5
6use serde::{Deserialize, Serialize};
7
8/// The final phase of the encoding process. It's also the first phase of the decoding process.
9///
10/// This data structure allows users to only load the state or the oplog.
11///
12/// - When only the state is needed, the `oplog` and `oplog_extra_arena` can be ignored.
13/// - When only the oplog is needed, the `app_state` can be ignored. (state_arena is still needed).
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct FinalPhase<'a> {
16    #[serde(borrow)]
17    pub common: Cow<'a, [u8]>, // -> CommonArena
18    #[serde(borrow)]
19    pub app_state: Cow<'a, [u8]>, // -> EncodedAppState
20    #[serde(borrow)]
21    pub state_arena: Cow<'a, [u8]>, // -> TempArena<'a>
22    #[serde(borrow)]
23    pub oplog_extra_arena: Cow<'a, [u8]>, // -> TempArena<'a>. Cannot have full history if this is dropped
24    #[serde(borrow)]
25    pub oplog: Cow<'a, [u8]>, // -> OpLog. Can be ignored if we only need state
26}
27
28impl<'a> FinalPhase<'a> {
29    #[inline(always)]
30    pub fn encode(&self) -> Vec<u8> {
31        let mut bytes = BytesMut::with_capacity(
32            self.common.len()
33                + self.app_state.len()
34                + self.state_arena.len()
35                + self.oplog_extra_arena.len()
36                + self.oplog.len()
37                + 10,
38        );
39
40        leb::write_unsigned(&mut bytes, self.common.len() as u64);
41        bytes.put_slice(&self.common);
42        leb::write_unsigned(&mut bytes, self.app_state.len() as u64);
43        bytes.put_slice(&self.app_state);
44        leb::write_unsigned(&mut bytes, self.state_arena.len() as u64);
45        bytes.put_slice(&self.state_arena);
46        leb::write_unsigned(&mut bytes, self.oplog_extra_arena.len() as u64);
47        bytes.put_slice(&self.oplog_extra_arena);
48        leb::write_unsigned(&mut bytes, self.oplog.len() as u64);
49        bytes.put_slice(&self.oplog);
50        bytes.to_vec()
51    }
52
53    #[inline(always)]
54    pub fn decode(bytes: &'a [u8]) -> Result<Self, LoroError> {
55        let mut index = 0;
56        let len = leb::read_unsigned(bytes, &mut index) as usize;
57        let common = &bytes[index..index + len];
58        index += len;
59
60        let len = leb::read_unsigned(bytes, &mut index) as usize;
61        let app_state = &bytes[index..index + len];
62        index += len;
63
64        let len = leb::read_unsigned(bytes, &mut index) as usize;
65        let state_arena = &bytes[index..index + len];
66        index += len;
67
68        let len = leb::read_unsigned(bytes, &mut index) as usize;
69        let additional_arena = &bytes[index..index + len];
70        index += len;
71
72        let len = leb::read_unsigned(bytes, &mut index) as usize;
73        let oplog = &bytes[index..index + len];
74
75        Ok(FinalPhase {
76            common: Cow::Borrowed(common),
77            app_state: Cow::Borrowed(app_state),
78            state_arena: Cow::Borrowed(state_arena),
79            oplog_extra_arena: Cow::Borrowed(additional_arena),
80            oplog: Cow::Borrowed(oplog),
81        })
82    }
83
84    pub fn diagnose_size(&self) {
85        println!("common: {}", self.common.len());
86        println!("app_state: {}", self.app_state.len());
87        println!("state_arena: {}", self.state_arena.len());
88        println!("additional_arena: {}", self.oplog_extra_arena.len());
89        println!("oplog: {}", self.oplog.len());
90    }
91}
92
93#[derive(Debug, Default, Clone, Serialize, Deserialize)]
94pub struct CommonArena<'a> {
95    #[serde(borrow)]
96    pub peer_ids: Cow<'a, [u64]>,
97    pub container_ids: Vec<ContainerID>,
98}
99
100impl<'a> CommonArena<'a> {
101    pub fn encode(&self) -> Vec<u8> {
102        to_vec(self).unwrap()
103    }
104
105    pub fn decode(data: &'a FinalPhase) -> Result<Self, LoroError> {
106        serde_columnar::from_bytes(&data.common)
107            .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
108    }
109}
110
111#[derive(Debug, Clone, Default, Serialize, Deserialize)]
112pub struct EncodedAppState<'a> {
113    pub frontiers: Vec<ID>,
114    /// container states
115    #[serde(borrow)]
116    pub states: Vec<EncodedContainerState<'a>>,
117    /// containers' parents
118    pub parents: Vec<Option<u32>>,
119}
120
121impl<'a> EncodedAppState<'a> {
122    pub fn encode(&self) -> Vec<u8> {
123        to_vec(self).unwrap()
124    }
125
126    pub fn decode(data: &'a FinalPhase) -> Result<EncodedAppState<'a>, LoroError> {
127        serde_columnar::from_bytes(&data.app_state)
128            .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
129    }
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub enum EncodedContainerState<'a> {
134    Map(Vec<MapEntry>),
135    List {
136        elem_idx: Vec<usize>,
137        elem_ids: Vec<ID>,
138    },
139    #[serde(borrow)]
140    Richtext(Box<EncodedRichtextState<'a>>),
141    Tree((Vec<EncodedTreeNode>, Vec<usize>)),
142}
143
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct EncodedTreeNode {
146    pub node_idx: usize,
147    pub parent: Option<usize>,
148    pub id: ID,
149}
150
151#[derive(Debug, Default, Clone, Serialize, Deserialize)]
152pub struct EncodedRichtextState<'a> {
153    /// It's composed of interleaved:
154    ///
155    /// - len of text ranges
156    /// - len of styles anchors
157    pub len: Vec<u32>,
158    /// This is encoded [TextRanges]
159    #[serde(borrow)]
160    pub text_bytes: Cow<'a, [u8]>,
161    pub ids: Vec<(u32, u32)>,
162    /// Style anchor index in the style arena
163    // TODO: can be optimized
164    pub styles: Vec<CompactStyleOp>,
165    /// It is a start or end anchor. It's indexed by bit position.
166    pub is_style_start: Vec<u8>,
167}
168
169#[columnar(vec, ser, de, iterable)]
170#[derive(Debug, Clone, Copy)]
171pub struct TextRange {
172    #[columnar(strategy = "DeltaRle")]
173    pub start: usize,
174    #[columnar(strategy = "DeltaRle")]
175    pub len: usize,
176}
177
178#[columnar(ser, de)]
179#[derive(Debug, Default)]
180pub struct TextRanges {
181    #[columnar(class = "vec", iter = "TextRange")]
182    pub ranges: Vec<TextRange>,
183}
184
185impl TextRanges {
186    #[inline]
187    pub fn decode_iter(
188        bytes: &[u8],
189    ) -> LoroResult<impl Iterator<Item = Result<TextRange, ColumnarError>> + '_> {
190        let iter = serde_columnar::iter_from_bytes::<TextRanges>(bytes)?;
191        Ok(iter.ranges)
192    }
193
194    #[inline]
195    pub fn encode(&self) -> Vec<u8> {
196        to_vec(self).unwrap()
197    }
198}
199
200impl<'a> EncodedContainerState<'a> {
201    pub fn container_type(&self) -> loro_common::ContainerType {
202        match self {
203            EncodedContainerState::Map(_) => loro_common::ContainerType::Map,
204            EncodedContainerState::List { .. } => loro_common::ContainerType::List,
205            EncodedContainerState::Tree(_) => loro_common::ContainerType::Tree,
206            EncodedContainerState::Richtext { .. } => loro_common::ContainerType::Text,
207        }
208    }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct MapEntry {
213    pub key: usize,   // index to the state arena
214    pub value: usize, // index to the state arena + 1. 0 means None
215    pub peer: u32,    // index to the peer ids
216    pub counter: u32, // index to the peer ids
217    pub lamport: u32,
218}
219
220#[derive(Debug, Default, Clone, Serialize, Deserialize)]
221pub struct CompactStyleOp {
222    /// index to the peer idx
223    pub peer_idx: u32,
224    /// index to the keywords idx
225    pub key_idx: u32,
226    pub counter: u32,
227    pub lamport: u32,
228    pub style_info: u8,
229    pub value: LoroValue,
230}
231
232#[derive(Debug, Default, Clone, Serialize, Deserialize)]
233pub struct TempArena<'a> {
234    #[serde(borrow)]
235    pub text: Cow<'a, [u8]>,
236    // PERF: can we use a Cow here?
237    pub keywords: Vec<InternalString>,
238    pub values: Vec<LoroValue>,
239    pub tree_ids: Vec<(u32, i32)>,
240}
241
242impl<'a> TempArena<'a> {
243    pub fn encode(&self) -> Vec<u8> {
244        to_vec(self).unwrap()
245    }
246
247    pub fn decode_state_arena(data: &'a FinalPhase) -> Result<Self, LoroError> {
248        serde_columnar::from_bytes(&data.state_arena)
249            .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
250    }
251
252    pub fn decode_additional_arena(data: &'a FinalPhase) -> Result<Self, LoroError> {
253        serde_columnar::from_bytes(&data.oplog_extra_arena)
254            .map_err(|e| LoroError::DecodeError(e.to_string().into_boxed_str()))
255    }
256}
257
258/// returns a deep LoroValue that wraps the whole state
259pub fn decode_state(_bytes: &[u8]) -> LoroValue {
260    unimplemented!()
261}
262
263mod leb {
264    use bytes::{BufMut, BytesMut};
265    pub const CONTINUATION_BIT: u8 = 1 << 7;
266
267    pub fn write_unsigned(w: &mut BytesMut, mut val: u64) -> usize {
268        let mut bytes_written = 0;
269        loop {
270            let mut byte = low_bits_of_u64(val);
271            val >>= 7;
272            if val != 0 {
273                // More bytes to come, so set the continuation bit.
274                byte |= CONTINUATION_BIT;
275            }
276
277            w.put_u8(byte);
278            bytes_written += 1;
279
280            if val == 0 {
281                return bytes_written;
282            }
283        }
284    }
285
286    #[doc(hidden)]
287    #[inline]
288    pub fn low_bits_of_byte(byte: u8) -> u8 {
289        byte & !CONTINUATION_BIT
290    }
291
292    #[doc(hidden)]
293    #[inline]
294    pub fn low_bits_of_u64(val: u64) -> u8 {
295        let byte = val & (std::u8::MAX as u64);
296        low_bits_of_byte(byte as u8)
297    }
298
299    pub fn read_unsigned(r: &[u8], index: &mut usize) -> u64 {
300        let mut result = 0;
301        let mut shift = 0;
302
303        loop {
304            let mut buf = [r[*index]];
305            *index += 1;
306
307            if shift == 63 && buf[0] != 0x00 && buf[0] != 0x01 {
308                while buf[0] & CONTINUATION_BIT != 0 {
309                    buf = [r[*index]];
310                    *index += 1;
311                }
312
313                panic!("overflow");
314            }
315
316            let low_bits = low_bits_of_byte(buf[0]) as u64;
317            result |= low_bits << shift;
318
319            if buf[0] & CONTINUATION_BIT == 0 {
320                return result;
321            }
322
323            shift += 7;
324        }
325    }
326}