1use crate::convert;
2use crate::error::{Error, Result};
3use crate::schema::{CityArrowHeader, CityArrowPackageVersion, ProjectionLayout};
4use crate::stream;
5use crate::transport::{CanonicalTable, CanonicalTableSink};
6use arrow::record_batch::RecordBatch;
7use cityjson::relational::RelationalAccess;
8use cityjson::v2_0::OwnedCityModel;
9use std::collections::VecDeque;
10use std::io::{Read, Write};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum SchemaVersion {
14 V3Alpha3,
15}
16
17impl SchemaVersion {
18 #[must_use]
19 pub const fn package_version(self) -> CityArrowPackageVersion {
20 match self {
21 Self::V3Alpha3 => CityArrowPackageVersion::V3Alpha3,
22 }
23 }
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ExportOptions {
28 pub schema_version: SchemaVersion,
29 pub batch_row_limit: usize,
30 pub dictionary_encode_strings: bool,
31}
32
33impl Default for ExportOptions {
34 fn default() -> Self {
35 Self {
36 schema_version: SchemaVersion::V3Alpha3,
37 batch_row_limit: 65_536,
38 dictionary_encode_strings: true,
39 }
40 }
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub struct ImportOptions {
45 pub expected_schema_version: Option<SchemaVersion>,
46 pub symbol_storage: cityjson::symbols::SymbolStorageOptions,
47 pub validate_schema: bool,
48}
49
50impl Default for ImportOptions {
51 fn default() -> Self {
52 Self {
53 expected_schema_version: Some(SchemaVersion::V3Alpha3),
54 symbol_storage: cityjson::symbols::SymbolStorageOptions::default(),
55 validate_schema: true,
56 }
57 }
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub struct WriteReport {
62 pub batch_count: usize,
63 pub row_count: usize,
64 pub payload_bytes: u64,
65}
66
67pub struct ModelBatchReader {
68 header: CityArrowHeader,
69 projection: ProjectionLayout,
70 batches: VecDeque<(CanonicalTable, RecordBatch)>,
71}
72
73impl ModelBatchReader {
74 #[must_use]
75 pub const fn header(&self) -> &CityArrowHeader {
76 &self.header
77 }
78
79 #[must_use]
80 pub const fn projection(&self) -> &ProjectionLayout {
81 &self.projection
82 }
83}
84
85impl Iterator for ModelBatchReader {
86 type Item = (CanonicalTable, RecordBatch);
87
88 fn next(&mut self) -> Option<Self::Item> {
89 self.batches.pop_front()
90 }
91}
92
93pub struct ModelBatchDecoder(convert::IncrementalDecoder);
94
95impl ModelBatchDecoder {
96 pub fn new(
101 header: CityArrowHeader,
102 projection: ProjectionLayout,
103 options: &ImportOptions,
104 ) -> Result<Self> {
105 validate_expected_schema_version(options, header.package_version)?;
106 convert::IncrementalDecoder::new(header, projection).map(Self)
107 }
108
109 pub fn push_batch(&mut self, table: CanonicalTable, batch: &RecordBatch) -> Result<()> {
113 self.0.push_batch(table, batch)
114 }
115
116 pub fn finish(self) -> Result<OwnedCityModel> {
121 self.0.finish()
122 }
123}
124
125pub fn export_reader(model: &OwnedCityModel, options: &ExportOptions) -> Result<ModelBatchReader> {
129 validate_export_schema_version(options)?;
130 let relational = model.relational();
131 let mut sink = BatchReaderSink::default();
132 convert::emit_tables(&relational, &mut sink)?;
133 sink.finish()
134}
135
136pub fn import_batches<I>(
140 header: CityArrowHeader,
141 projection: ProjectionLayout,
142 batches: I,
143 options: &ImportOptions,
144) -> Result<OwnedCityModel>
145where
146 I: IntoIterator<Item = (CanonicalTable, RecordBatch)>,
147{
148 let mut decoder = ModelBatchDecoder::new(header, projection, options)?;
149 for (table, batch) in batches {
150 decoder.push_batch(table, &batch)?;
151 }
152 decoder.finish()
153}
154
155pub fn write_stream<W: Write>(
159 writer: W,
160 model: &OwnedCityModel,
161 options: &ExportOptions,
162) -> Result<WriteReport> {
163 validate_export_schema_version(options)?;
164 let relational = model.relational();
165 stream::write_model_stream(&relational, writer)
166}
167
168pub fn read_stream<R: Read>(reader: R, options: &ImportOptions) -> Result<OwnedCityModel> {
172 let (header, projection, batches) = stream::read_stream_batches(reader)?;
173 let ordered_batches = batches
174 .into_iter()
175 .map(|(table, expected_rows, batch)| {
176 if batch.num_rows() == expected_rows {
177 Ok((table, batch))
178 } else {
179 Err(Error::Conversion(format!(
180 "{} frame declared {expected_rows} rows but decoded {} rows",
181 table.as_str(),
182 batch.num_rows()
183 )))
184 }
185 })
186 .collect::<Result<Vec<_>>>()?;
187 import_batches(header, projection, ordered_batches, options)
188}
189
190fn validate_export_schema_version(options: &ExportOptions) -> Result<()> {
191 if options.schema_version == SchemaVersion::V3Alpha3 {
192 Ok(())
193 } else {
194 Err(Error::Unsupported(format!(
195 "schema version '{}' is not supported by this crate",
196 options.schema_version.package_version().as_str()
197 )))
198 }
199}
200
201fn validate_expected_schema_version(
202 options: &ImportOptions,
203 actual: CityArrowPackageVersion,
204) -> Result<()> {
205 if let Some(expected) = options.expected_schema_version
206 && expected.package_version() != actual
207 {
208 return Err(Error::Unsupported(format!(
209 "stream uses '{}' but '{}' was requested",
210 actual.as_str(),
211 expected.package_version().as_str()
212 )));
213 }
214 Ok(())
215}
216
217#[derive(Default)]
218struct BatchReaderSink {
219 header: Option<CityArrowHeader>,
220 projection: Option<ProjectionLayout>,
221 batches: VecDeque<(CanonicalTable, RecordBatch)>,
222}
223
224impl BatchReaderSink {
225 fn finish(self) -> Result<ModelBatchReader> {
226 Ok(ModelBatchReader {
227 header: self
228 .header
229 .ok_or_else(|| Error::Conversion("missing canonical table header".to_string()))?,
230 projection: self.projection.ok_or_else(|| {
231 Error::Conversion("missing canonical table projection".to_string())
232 })?,
233 batches: self.batches,
234 })
235 }
236}
237
238impl CanonicalTableSink for BatchReaderSink {
239 fn start(&mut self, header: &CityArrowHeader, projection: &ProjectionLayout) -> Result<()> {
240 self.header = Some(header.clone());
241 self.projection = Some(projection.clone());
242 Ok(())
243 }
244
245 fn push_batch(&mut self, table: CanonicalTable, batch: RecordBatch) -> Result<()> {
246 self.batches.push_back((table, batch));
247 Ok(())
248 }
249}