Skip to main content

cityjson_arrow/
codec.rs

1use crate::convert;
2use crate::error::{Error, Result};
3use crate::schema::{CityArrowHeader, CityArrowPackageVersion, ProjectionLayout};
4use crate::stream;
5use crate::transport::{CanonicalTable, CanonicalTableSink};
6use arrow::record_batch::RecordBatch;
7use cityjson::relational::RelationalAccess;
8use cityjson::v2_0::OwnedCityModel;
9use std::collections::VecDeque;
10use std::io::{Read, Write};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum SchemaVersion {
14    V3Alpha3,
15}
16
17impl SchemaVersion {
18    #[must_use]
19    pub const fn package_version(self) -> CityArrowPackageVersion {
20        match self {
21            Self::V3Alpha3 => CityArrowPackageVersion::V3Alpha3,
22        }
23    }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ExportOptions {
28    pub schema_version: SchemaVersion,
29    pub batch_row_limit: usize,
30    pub dictionary_encode_strings: bool,
31}
32
33impl Default for ExportOptions {
34    fn default() -> Self {
35        Self {
36            schema_version: SchemaVersion::V3Alpha3,
37            batch_row_limit: 65_536,
38            dictionary_encode_strings: true,
39        }
40    }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct ImportOptions {
45    pub expected_schema_version: Option<SchemaVersion>,
46    pub symbol_storage: cityjson::symbols::SymbolStorageOptions,
47    pub validate_schema: bool,
48}
49
50impl Default for ImportOptions {
51    fn default() -> Self {
52        Self {
53            expected_schema_version: Some(SchemaVersion::V3Alpha3),
54            symbol_storage: cityjson::symbols::SymbolStorageOptions::default(),
55            validate_schema: true,
56        }
57    }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub struct WriteReport {
62    pub batch_count: usize,
63    pub row_count: usize,
64    pub payload_bytes: u64,
65}
66
67pub struct ModelBatchReader {
68    header: CityArrowHeader,
69    projection: ProjectionLayout,
70    batches: VecDeque<(CanonicalTable, RecordBatch)>,
71}
72
73impl ModelBatchReader {
74    #[must_use]
75    pub const fn header(&self) -> &CityArrowHeader {
76        &self.header
77    }
78
79    #[must_use]
80    pub const fn projection(&self) -> &ProjectionLayout {
81        &self.projection
82    }
83}
84
85impl Iterator for ModelBatchReader {
86    type Item = (CanonicalTable, RecordBatch);
87
88    fn next(&mut self) -> Option<Self::Item> {
89        self.batches.pop_front()
90    }
91}
92
93pub struct ModelBatchDecoder(convert::IncrementalDecoder);
94
95impl ModelBatchDecoder {
96    /// # Errors
97    ///
98    /// Returns an error when the schema version is not supported or the
99    /// canonical projection is invalid.
100    pub fn new(
101        header: CityArrowHeader,
102        projection: ProjectionLayout,
103        options: &ImportOptions,
104    ) -> Result<Self> {
105        validate_expected_schema_version(options, header.package_version)?;
106        convert::IncrementalDecoder::new(header, projection).map(Self)
107    }
108
109    /// # Errors
110    ///
111    /// Returns an error when the batch order, schema, or data is invalid.
112    pub fn push_batch(&mut self, table: CanonicalTable, batch: &RecordBatch) -> Result<()> {
113        self.0.push_batch(table, batch)
114    }
115
116    /// # Errors
117    ///
118    /// Returns an error when required tables are missing or reconstruction
119    /// cannot finish successfully.
120    pub fn finish(self) -> Result<OwnedCityModel> {
121        self.0.finish()
122    }
123}
124
125/// # Errors
126///
127/// Returns an error when export conversion fails.
128pub fn export_reader(model: &OwnedCityModel, options: &ExportOptions) -> Result<ModelBatchReader> {
129    validate_export_schema_version(options)?;
130    let relational = model.relational();
131    let mut sink = BatchReaderSink::default();
132    convert::emit_tables(&relational, &mut sink)?;
133    sink.finish()
134}
135
136/// # Errors
137///
138/// Returns an error when ordered batch import fails.
139pub fn import_batches<I>(
140    header: CityArrowHeader,
141    projection: ProjectionLayout,
142    batches: I,
143    options: &ImportOptions,
144) -> Result<OwnedCityModel>
145where
146    I: IntoIterator<Item = (CanonicalTable, RecordBatch)>,
147{
148    let mut decoder = ModelBatchDecoder::new(header, projection, options)?;
149    for (table, batch) in batches {
150        decoder.push_batch(table, &batch)?;
151    }
152    decoder.finish()
153}
154
155/// # Errors
156///
157/// Returns an error when conversion or stream serialization fails.
158pub fn write_stream<W: Write>(
159    writer: W,
160    model: &OwnedCityModel,
161    options: &ExportOptions,
162) -> Result<WriteReport> {
163    validate_export_schema_version(options)?;
164    let relational = model.relational();
165    stream::write_model_stream(&relational, writer)
166}
167
168/// # Errors
169///
170/// Returns an error when stream decoding or model reconstruction fails.
171pub fn read_stream<R: Read>(reader: R, options: &ImportOptions) -> Result<OwnedCityModel> {
172    let (header, projection, batches) = stream::read_stream_batches(reader)?;
173    let ordered_batches = batches
174        .into_iter()
175        .map(|(table, expected_rows, batch)| {
176            if batch.num_rows() == expected_rows {
177                Ok((table, batch))
178            } else {
179                Err(Error::Conversion(format!(
180                    "{} frame declared {expected_rows} rows but decoded {} rows",
181                    table.as_str(),
182                    batch.num_rows()
183                )))
184            }
185        })
186        .collect::<Result<Vec<_>>>()?;
187    import_batches(header, projection, ordered_batches, options)
188}
189
190fn validate_export_schema_version(options: &ExportOptions) -> Result<()> {
191    if options.schema_version == SchemaVersion::V3Alpha3 {
192        Ok(())
193    } else {
194        Err(Error::Unsupported(format!(
195            "schema version '{}' is not supported by this crate",
196            options.schema_version.package_version().as_str()
197        )))
198    }
199}
200
201fn validate_expected_schema_version(
202    options: &ImportOptions,
203    actual: CityArrowPackageVersion,
204) -> Result<()> {
205    if let Some(expected) = options.expected_schema_version
206        && expected.package_version() != actual
207    {
208        return Err(Error::Unsupported(format!(
209            "stream uses '{}' but '{}' was requested",
210            actual.as_str(),
211            expected.package_version().as_str()
212        )));
213    }
214    Ok(())
215}
216
217#[derive(Default)]
218struct BatchReaderSink {
219    header: Option<CityArrowHeader>,
220    projection: Option<ProjectionLayout>,
221    batches: VecDeque<(CanonicalTable, RecordBatch)>,
222}
223
224impl BatchReaderSink {
225    fn finish(self) -> Result<ModelBatchReader> {
226        Ok(ModelBatchReader {
227            header: self
228                .header
229                .ok_or_else(|| Error::Conversion("missing canonical table header".to_string()))?,
230            projection: self.projection.ok_or_else(|| {
231                Error::Conversion("missing canonical table projection".to_string())
232            })?,
233            batches: self.batches,
234        })
235    }
236}
237
238impl CanonicalTableSink for BatchReaderSink {
239    fn start(&mut self, header: &CityArrowHeader, projection: &ProjectionLayout) -> Result<()> {
240        self.header = Some(header.clone());
241        self.projection = Some(projection.clone());
242        Ok(())
243    }
244
245    fn push_batch(&mut self, table: CanonicalTable, batch: RecordBatch) -> Result<()> {
246        self.batches.push_back((table, batch));
247        Ok(())
248    }
249}