1use futures::StreamExt;
2use rusqlite::params_from_iter;
3use rusqlite::types::Value;
4use tokio_stream::wrappers::ReceiverStream;
5use tracing::{debug, warn};
6
7use utiles_core::{Tile, TileLike};
8
9use crate::hash::xxh64_be_hex_upper;
10use crate::mbt::{MbtType, Mbtiles};
11use crate::sqlite::InsertStrategy;
12use crate::{UtilesError, UtilesResult};
13
14pub enum MbtWriterStreamData {
15 Tile(Tile, Vec<u8>, Option<String>),
16 Metadata(String, String),
17}
18
19impl From<(Tile, Vec<u8>, Option<String>)> for MbtWriterStreamData {
20 fn from(data: (Tile, Vec<u8>, Option<String>)) -> Self {
21 Self::Tile(data.0, data.1, data.2)
22 }
23}
24
25#[derive(Default)]
26pub struct MbtWriterStats {
27 pub count: usize,
28 pub nbytes: usize,
29}
30
31pub struct MbtStreamWriterSync {
32 pub stream: ReceiverStream<MbtWriterStreamData>,
33 pub mbt: Mbtiles,
34 pub on_conflict: InsertStrategy,
35 pub stats: MbtWriterStats,
36}
37
38impl MbtStreamWriterSync {
39 pub fn preflight(&self) -> UtilesResult<()> {
40 self.mbt
41 .conn
42 .execute_batch(
43 r"
44 PRAGMA synchronous = OFF;
45 PRAGMA journal_mode = WAL;
46 PRAGMA locking_mode = EXCLUSIVE;
47 PRAGMA temp_store = MEMORY;
48 PRAGMA cache_size = 100000;
49 ",
50 )
51 .map_err(Into::into)
52 }
53
54 pub fn postflight(&self) -> UtilesResult<()> {
55 self.mbt
56 .conn
57 .execute_batch(
58 r"
59 PRAGMA synchronous = NORMAL;
60 PRAGMA journal_mode = DELETE;
61 PRAGMA locking_mode = NORMAL;
62 PRAGMA temp_store = DEFAULT;
63 PRAGMA cache_size = 2000;
64 ",
65 )
66 .map_err(Into::into)
67 }
68
69 pub async fn write_flat(&mut self) -> UtilesResult<()> {
70 let stmt_str = match self.on_conflict {
71 InsertStrategy::Ignore => {
72 "INSERT OR IGNORE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
73 }
74 InsertStrategy::Replace => {
75 "INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
76 }
77 InsertStrategy::Abort => {
78 "INSERT OR ABORT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
79 }
80 InsertStrategy::Rollback => {
81 "INSERT OR ROLLBACK INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
82 }
83 _ => {
84 "INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
85 }
86 };
87 let mut stmt = self.mbt.conn.prepare(stmt_str)?;
88 let stream = &mut self.stream;
89 while let Some(value) = stream.next().await {
90 match value {
91 MbtWriterStreamData::Metadata(_key, _value) => {
92 warn!("Writing metadata not yet supported");
93 }
94 MbtWriterStreamData::Tile(tile, tile_data, _) => {
95 let tile_params =
96 rusqlite::params![tile.z, tile.x, tile.yup(), tile_data];
97 let insert_res = stmt.execute(tile_params);
98 if let Err(e) = insert_res {
99 warn!("insert_res: {:?}", e);
100 } else {
101 self.stats.count += 1;
102 self.stats.nbytes += tile_data.len();
103 debug!(
104 "count: {}, nbytes: {}",
105 self.stats.count, self.stats.nbytes
106 );
107 }
108 }
109 }
110 }
111 Ok(())
112 }
113
114 pub async fn write_hash(&mut self) -> UtilesResult<()> {
115 let stmt_str = match self.on_conflict {
116 InsertStrategy::Ignore => {
117 "INSERT OR IGNORE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
118 }
119 InsertStrategy::Replace => {
120 "INSERT OR REPLACE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
121 }
122 InsertStrategy::Abort => {
123 "INSERT OR ABORT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
124 }
125 InsertStrategy::Rollback => {
126 "INSERT OR ROLLBACK INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
127 }
128 _ => {
129 "INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
130 }
131 };
132 let mut stmt = self.mbt.conn.prepare(stmt_str)?;
133 let stream = &mut self.stream;
134 while let Some(value) = stream.next().await {
135 if let MbtWriterStreamData::Tile(tile, tile_data, hash_hex) = value {
136 let hash_hex =
137 hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
138 let tile_params =
139 rusqlite::params![tile.z, tile.x, tile.yup(), tile_data, hash_hex];
140 let insert_res = stmt.execute(tile_params);
141 if let Err(e) = insert_res {
142 warn!("insert_res: {:?}", e);
143 } else {
144 self.stats.count += 1;
145 self.stats.nbytes += tile_data.len();
146 debug!(
147 "count: {}, nbytes: {}",
148 self.stats.count, self.stats.nbytes
149 );
150 }
151 }
152 }
153 Ok(())
154 }
155
156 pub async fn write_norm(&mut self) -> UtilesResult<()> {
157 let map_stmt_str = match self.on_conflict {
158 InsertStrategy::Ignore => {
159 "INSERT OR IGNORE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
160 }
161 InsertStrategy::Replace => {
162 "INSERT OR REPLACE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
163 }
164 InsertStrategy::Abort => {
165 "INSERT OR ABORT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
166 }
167 InsertStrategy::Rollback => {
168 "INSERT OR ROLLBACK INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
169 }
170 _ => {
171 "INSERT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
172 }
173 };
174 let mut map_stmt = self.mbt.conn.prepare(map_stmt_str)?;
175 let mut blob_stmt = self.mbt.conn.prepare(
176 "INSERT OR IGNORE INTO images (tile_id, tile_data) VALUES (?1, ?2);",
177 )?;
178 while let Some(value) = self.stream.next().await {
179 match value {
180 MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
181 let hash_hex =
182 hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
183 let map_insert_res =
184 rusqlite::params![tile.z, tile.x, tile.yup(), hash_hex];
185 let map_insert_res = map_stmt.execute(map_insert_res);
186 if let Err(e) = map_insert_res {
187 warn!("insert_res: {:?}", e);
188 } else {
189 self.stats.count += 1;
190 self.stats.nbytes += tile_data.len();
191 debug!(
192 "count: {}, nbytes: {}",
193 self.stats.count, self.stats.nbytes
194 );
195 }
196 let blob_params = rusqlite::params![hash_hex, tile_data];
197 let insert_res = blob_stmt.execute(blob_params);
198 if let Err(e) = insert_res {
199 warn!("blob insert res: {:?}", e);
200 } else {
201 self.stats.count += 1;
202 self.stats.nbytes += tile_data.len();
203 debug!(
204 "count: {}, nbytes: {}",
205 self.stats.count, self.stats.nbytes
206 );
207 }
208 }
209 MbtWriterStreamData::Metadata(_key, _value) => {
210 warn!("Writing metadata not yet supported");
211 }
212 }
213 }
214 Ok(())
215 }
216
217 pub async fn write(&mut self) -> UtilesResult<()> {
218 let db_type = self.mbt.query_mbt_type()?;
219 self.preflight()?;
220 match db_type {
221 MbtType::Flat => self.write_flat().await,
222 MbtType::Hash => self.write_hash().await,
223 MbtType::Norm => self.write_norm().await,
224 _ => Err(UtilesError::Unsupported(
225 "stream write for unknown db type".to_string(),
226 )),
227 }?;
228 self.postflight()?;
229 Ok(())
230 }
231
232 pub async fn write_batched(&mut self) -> UtilesResult<()> {
233 self.preflight()?;
234 let mut batch = vec![];
235 while let Some(value) = self.stream.next().await {
236 match value {
237 MbtWriterStreamData::Metadata(_key, _value) => {
238 warn!("Writing metadata not yet supported");
239 }
240 MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
241 let hash_hex =
242 hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
243 batch.push((tile, tile_data, hash_hex));
244 if batch.len() >= 100 {
245 let placeholders = batch
246 .iter()
247 .map(|_| "(?, ?, ?, ?, ?)")
248 .collect::<Vec<_>>()
249 .join(", ");
250 let mut stmt = self.mbt.conn.prepare_cached(
251 &format!("INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES {placeholders};"),
252 )?;
253 let param_values: Vec<Value> = batch
254 .iter()
255 .flat_map(|(tile, tile_data, hash_hex)| {
256 vec![
257 Value::Integer(i64::from(tile.z())),
258 Value::Integer(i64::from(tile.x())),
259 Value::Integer(i64::from(tile.yup())),
260 Value::Blob(tile_data.clone()),
261 Value::Text(hash_hex.clone()),
262 ]
263 })
264 .collect();
265 let insert_res =
266 stmt.execute(params_from_iter(param_values.iter()));
267 batch.clear();
268 if let Err(e) = insert_res {
269 warn!("insert_res: {:?}", e);
270 } else {
271 debug!(
272 "count: {}, nbytes: {}",
273 self.stats.count, self.stats.nbytes
274 );
275 }
276 }
277 }
278 }
279 }
280
281 if !batch.is_empty() {
282 let placeholders = batch
283 .iter()
284 .map(|_| "(?, ?, ?, ?)")
285 .collect::<Vec<_>>()
286 .join(", ");
287 let mut stmt = self.mbt.conn.prepare_cached(
288 &format!("INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES {placeholders};"),
289 )?;
290 let param_values: Vec<Value> = batch
291 .iter()
292 .flat_map(|(tile, tile_data, _)| {
293 vec![
294 Value::Integer(i64::from(tile.z())),
295 Value::Integer(i64::from(tile.x())),
296 Value::Integer(i64::from(tile.yup())),
297 Value::Blob(tile_data.clone()),
298 ]
299 })
300 .collect();
301 let insert_res = stmt.execute(params_from_iter(param_values.iter()));
302 if let Err(e) = insert_res {
303 warn!("insert_res: {:?}", e);
304 } else {
305 debug!("count: {}, nbytes: {}", self.stats.count, self.stats.nbytes);
306 }
307 }
308 self.postflight()?;
309 Ok(())
310 }
311}