Skip to main content

fwob_v1/
writer.rs

1use std::{
2    fs::{File, OpenOptions},
3    io::{Read, Seek, SeekFrom, Write},
4    path::Path,
5};
6
7use fwob_core::{FrameRef, Key, KeyType, Schema};
8
9use crate::{
10    header::{
11        read_header, update_frame_count, update_string_table_len, write_header, Header,
12        DEFAULT_STRING_TABLE_PRESERVED_LEN, MAX_FIELDS, MAX_FIELD_NAME_LEN, MAX_FRAME_TYPE_LEN,
13        MAX_TITLE_LEN, VERSION,
14    },
15    Result, V1Error,
16};
17
18#[derive(Debug, Clone)]
19pub struct WriterOptions {
20    pub title: String,
21    pub string_table_preserved_length: u32,
22}
23
24impl WriterOptions {
25    pub fn new(title: impl Into<String>) -> Self {
26        Self {
27            title: title.into(),
28            string_table_preserved_length: DEFAULT_STRING_TABLE_PRESERVED_LEN,
29        }
30    }
31}
32
33pub struct Writer<W> {
34    inner: W,
35    header: Header,
36    schema: Schema,
37    key_type: KeyType,
38    last_key: Option<Key>,
39}
40
41impl Writer<File> {
42    pub fn create(path: impl AsRef<Path>, schema: Schema, options: WriterOptions) -> Result<Self> {
43        validate_v1_metadata(&schema, &options)?;
44        let file = File::create(path)?;
45        Self::new(file, schema, options)
46    }
47
48    pub fn open_append(path: impl AsRef<Path>, key_field_index: usize) -> Result<Self> {
49        let mut file = OpenOptions::new().read(true).write(true).open(path)?;
50        let actual_len = file.metadata()?.len();
51        file.seek(SeekFrom::Start(0))?;
52        let header = read_header(&mut file)?;
53        let expected_len = header.file_length();
54        if actual_len != expected_len {
55            return Err(V1Error::CorruptedFileLength {
56                expected: expected_len,
57                actual: actual_len,
58            });
59        }
60
61        let schema = header.schema(key_field_index)?;
62        let key_type = KeyType::from_field(schema.key_field())?;
63        let last_key = if header.frame_count == 0 {
64            None
65        } else {
66            let key_field = schema.key_field();
67            let key_offset = header.first_frame_position()
68                + u64::from(header.frame_length) * (header.frame_count - 1)
69                + u64::from(key_field.offset);
70            file.seek(SeekFrom::Start(key_offset))?;
71            let mut bytes = vec![0u8; key_field.length as usize];
72            file.read_exact(&mut bytes)?;
73            Some(Key::decode(key_type, &bytes)?)
74        };
75        file.seek(SeekFrom::Start(expected_len))?;
76        Ok(Self {
77            inner: file,
78            header,
79            schema,
80            key_type,
81            last_key,
82        })
83    }
84}
85
86impl<W: Write + Seek> Writer<W> {
87    pub fn new(mut inner: W, schema: Schema, options: WriterOptions) -> Result<Self> {
88        validate_v1_metadata(&schema, &options)?;
89        let key_type = KeyType::from_field(schema.key_field())?;
90        let header = Header {
91            version: VERSION,
92            field_count: schema.fields.len() as u8,
93            field_lengths: schema.fields.iter().map(|f| f.length as u8).collect(),
94            field_types: schema
95                .fields
96                .iter()
97                .enumerate()
98                .fold(0u64, |acc, (i, f)| acc | ((f.field_type as u64) << (i * 4))),
99            field_names: schema.fields.iter().map(|f| f.name.clone()).collect(),
100            string_count: 0,
101            string_table_length: 0,
102            string_table_preserved_length: options.string_table_preserved_length,
103            frame_count: 0,
104            frame_length: schema.frame_len,
105            frame_type: schema.frame_type.clone(),
106            title: options.title,
107        };
108        write_header(&mut inner, &header)?;
109        inner.write_all(&vec![0; header.string_table_preserved_length as usize])?;
110        inner.flush()?;
111        Ok(Self {
112            inner,
113            header,
114            schema,
115            key_type,
116            last_key: None,
117        })
118    }
119
120    pub fn header(&self) -> &Header {
121        &self.header
122    }
123
124    pub fn schema(&self) -> &Schema {
125        &self.schema
126    }
127
128    pub fn frame_count(&self) -> u64 {
129        self.header.frame_count
130    }
131
132    pub fn append_string(&mut self, value: &str) -> Result<u32> {
133        let encoded_len = dotnet_string_len(value);
134        let required = self.header.string_table_length + encoded_len;
135        if required > self.header.string_table_preserved_length {
136            return Err(V1Error::StringTableOutOfSpace {
137                required,
138                preserved: self.header.string_table_preserved_length,
139            });
140        }
141
142        self.inner
143            .seek(SeekFrom::Start(self.header.string_table_ending()))?;
144        write_dotnet_string(&mut self.inner, value)?;
145        let index = self.header.string_count;
146        self.header.string_count += 1;
147        self.header.string_table_length = required;
148        update_string_table_len(
149            &mut self.inner,
150            self.header.string_count,
151            self.header.string_table_length,
152        )?;
153        self.inner.flush()?;
154        Ok(index)
155    }
156
157    pub fn append_frame(&mut self, bytes: &[u8]) -> Result<()> {
158        let frame = FrameRef::new(&self.schema, bytes)?;
159        let key = frame.key(&self.schema, self.key_type)?;
160        if let Some(last_key) = self.last_key {
161            if key < last_key {
162                return Err(V1Error::KeyOrderViolation {
163                    index: self.header.frame_count,
164                });
165            }
166        }
167        self.inner
168            .seek(SeekFrom::Start(self.header.file_length()))?;
169        self.inner.write_all(bytes)?;
170        self.header.frame_count += 1;
171        self.last_key = Some(key);
172        update_frame_count(&mut self.inner, self.header.frame_count)?;
173        self.inner.flush()?;
174        Ok(())
175    }
176
177    pub fn append_presorted_raw_frames(&mut self, bytes: &[u8]) -> Result<()> {
178        let frame_len = self.schema.frame_len as usize;
179        if bytes.len() % frame_len != 0 {
180            return Err(V1Error::Core(fwob_core::FwobError::InvalidFrameLength {
181                expected: frame_len,
182                actual: bytes.len(),
183            }));
184        }
185        if bytes.is_empty() {
186            return Ok(());
187        }
188
189        let first = FrameRef::new(&self.schema, &bytes[..frame_len])?;
190        let first_key = first.key(&self.schema, self.key_type)?;
191        if let Some(last_key) = self.last_key {
192            if first_key < last_key {
193                return Err(V1Error::KeyOrderViolation {
194                    index: self.header.frame_count,
195                });
196            }
197        }
198
199        let last_offset = bytes.len() - frame_len;
200        let last = FrameRef::new(&self.schema, &bytes[last_offset..])?;
201        self.last_key = Some(last.key(&self.schema, self.key_type)?);
202
203        self.inner
204            .seek(SeekFrom::Start(self.header.file_length()))?;
205        self.inner.write_all(bytes)?;
206        self.header.frame_count += (bytes.len() / frame_len) as u64;
207        update_frame_count(&mut self.inner, self.header.frame_count)?;
208        self.inner.flush()?;
209        Ok(())
210    }
211
212    pub fn append_raw_frames_transactional(&mut self, bytes: &[u8]) -> Result<()> {
213        let frame_len = self.schema.frame_len as usize;
214        if bytes.len() % frame_len != 0 {
215            return Err(V1Error::Core(fwob_core::FwobError::InvalidFrameLength {
216                expected: frame_len,
217                actual: bytes.len(),
218            }));
219        }
220        if bytes.is_empty() {
221            return Ok(());
222        }
223
224        let mut last_key = self.last_key;
225        for (offset, frame_bytes) in bytes.chunks_exact(frame_len).enumerate() {
226            let frame = FrameRef::new(&self.schema, frame_bytes)?;
227            let key = frame.key(&self.schema, self.key_type)?;
228            if let Some(previous) = last_key {
229                if key < previous {
230                    return Err(V1Error::KeyOrderViolation {
231                        index: self.header.frame_count + offset as u64,
232                    });
233                }
234            }
235            last_key = Some(key);
236        }
237
238        self.inner
239            .seek(SeekFrom::Start(self.header.file_length()))?;
240        self.inner.write_all(bytes)?;
241        self.header.frame_count += (bytes.len() / frame_len) as u64;
242        self.last_key = last_key;
243        update_frame_count(&mut self.inner, self.header.frame_count)?;
244        self.inner.flush()?;
245        Ok(())
246    }
247}
248
249fn validate_v1_metadata(schema: &Schema, options: &WriterOptions) -> Result<()> {
250    let valid_ascii = |value: &str, max_len: usize| {
251        !value.is_empty() && value.is_ascii() && value.len() <= max_len
252    };
253    if schema.fields.len() > MAX_FIELDS
254        || !valid_ascii(&schema.frame_type, MAX_FRAME_TYPE_LEN)
255        || !valid_ascii(&options.title, MAX_TITLE_LEN)
256        // Field semantics (and the key-field index) have no slot in the v1 on-disk format. They
257        // are accepted as in-memory schema attributes but not persisted, so they read back as the
258        // default (`FieldSemantic::None`). v2 is the format that persists semantics.
259        || schema.fields.iter().any(|field| {
260            !valid_ascii(&field.name, MAX_FIELD_NAME_LEN) || field.length > u8::MAX as u16
261        })
262        || options.string_table_preserved_length > i32::MAX as u32
263        || schema.frame_len > i32::MAX as u32
264    {
265        return Err(V1Error::Core(fwob_core::FwobError::InvalidSchema(
266            "schema or metadata exceeds FWOB v1 limits".into(),
267        )));
268    }
269    Ok(())
270}
271
272pub(crate) fn write_dotnet_string<W: Write>(writer: &mut W, value: &str) -> Result<()> {
273    write_7bit_encoded_int(writer, value.len() as u32)?;
274    writer.write_all(value.as_bytes())?;
275    Ok(())
276}
277
278fn dotnet_string_len(value: &str) -> u32 {
279    let len = value.len() as u32;
280    let prefix = if len < 0x80 {
281        1
282    } else if len < 0x4000 {
283        2
284    } else if len < 0x20_0000 {
285        3
286    } else if len < 0x1000_0000 {
287        4
288    } else {
289        5
290    };
291    prefix + len
292}
293
294fn write_7bit_encoded_int<W: Write>(writer: &mut W, mut value: u32) -> Result<()> {
295    while value >= 0x80 {
296        writer.write_all(&[((value as u8) & 0x7f) | 0x80])?;
297        value >>= 7;
298    }
299    writer.write_all(&[value as u8])?;
300    Ok(())
301}