utiles/mbt/
stream_writer.rs

1use futures::StreamExt;
2use rusqlite::params_from_iter;
3use rusqlite::types::Value;
4use tokio_stream::wrappers::ReceiverStream;
5use tracing::{debug, warn};
6
7use utiles_core::{Tile, TileLike};
8
9use crate::hash::xxh64_be_hex_upper;
10use crate::mbt::{MbtType, Mbtiles};
11use crate::sqlite::InsertStrategy;
12use crate::{UtilesError, UtilesResult};
13
14pub enum MbtWriterStreamData {
15    Tile(Tile, Vec<u8>, Option<String>),
16    Metadata(String, String),
17}
18
19impl From<(Tile, Vec<u8>, Option<String>)> for MbtWriterStreamData {
20    fn from(data: (Tile, Vec<u8>, Option<String>)) -> Self {
21        Self::Tile(data.0, data.1, data.2)
22    }
23}
24
25#[derive(Default)]
26pub struct MbtWriterStats {
27    pub count: usize,
28    pub nbytes: usize,
29}
30
31pub struct MbtStreamWriterSync {
32    pub stream: ReceiverStream<MbtWriterStreamData>,
33    pub mbt: Mbtiles,
34    pub on_conflict: InsertStrategy,
35    pub stats: MbtWriterStats,
36}
37
38impl MbtStreamWriterSync {
39    pub fn preflight(&self) -> UtilesResult<()> {
40        self.mbt
41            .conn
42            .execute_batch(
43                r"
44            PRAGMA synchronous = OFF;
45            PRAGMA journal_mode = WAL;
46            PRAGMA locking_mode = EXCLUSIVE;
47            PRAGMA temp_store = MEMORY;
48            PRAGMA cache_size = 100000;
49            ",
50            )
51            .map_err(Into::into)
52    }
53
54    pub fn postflight(&self) -> UtilesResult<()> {
55        self.mbt
56            .conn
57            .execute_batch(
58                r"
59            PRAGMA synchronous = NORMAL;
60            PRAGMA journal_mode = DELETE;
61            PRAGMA locking_mode = NORMAL;
62            PRAGMA temp_store = DEFAULT;
63            PRAGMA cache_size = 2000;
64            ",
65            )
66            .map_err(Into::into)
67    }
68
69    pub async fn write_flat(&mut self) -> UtilesResult<()> {
70        let stmt_str = match self.on_conflict {
71            InsertStrategy::Ignore => {
72                "INSERT OR IGNORE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
73            }
74            InsertStrategy::Replace => {
75                "INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
76            }
77            InsertStrategy::Abort => {
78                "INSERT OR ABORT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
79            }
80            InsertStrategy::Rollback => {
81                "INSERT OR ROLLBACK INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
82            }
83            _ => {
84                "INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
85            }
86        };
87        let mut stmt = self.mbt.conn.prepare(stmt_str)?;
88        let stream = &mut self.stream;
89        while let Some(value) = stream.next().await {
90            match value {
91                MbtWriterStreamData::Metadata(_key, _value) => {
92                    warn!("Writing metadata not yet supported");
93                }
94                MbtWriterStreamData::Tile(tile, tile_data, _) => {
95                    let tile_params =
96                        rusqlite::params![tile.z, tile.x, tile.yup(), tile_data];
97                    let insert_res = stmt.execute(tile_params);
98                    if let Err(e) = insert_res {
99                        warn!("insert_res: {:?}", e);
100                    } else {
101                        self.stats.count += 1;
102                        self.stats.nbytes += tile_data.len();
103                        debug!(
104                            "count: {}, nbytes: {}",
105                            self.stats.count, self.stats.nbytes
106                        );
107                    }
108                }
109            }
110        }
111        Ok(())
112    }
113
114    pub async fn write_hash(&mut self) -> UtilesResult<()> {
115        let stmt_str = match self.on_conflict {
116            InsertStrategy::Ignore => {
117                "INSERT OR IGNORE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
118            }
119            InsertStrategy::Replace => {
120                "INSERT OR REPLACE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
121            }
122            InsertStrategy::Abort => {
123                "INSERT OR ABORT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
124            }
125            InsertStrategy::Rollback => {
126                "INSERT OR ROLLBACK INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
127            }
128            _ => {
129                "INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
130            }
131        };
132        let mut stmt = self.mbt.conn.prepare(stmt_str)?;
133        let stream = &mut self.stream;
134        while let Some(value) = stream.next().await {
135            if let MbtWriterStreamData::Tile(tile, tile_data, hash_hex) = value {
136                let hash_hex =
137                    hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
138                let tile_params =
139                    rusqlite::params![tile.z, tile.x, tile.yup(), tile_data, hash_hex];
140                let insert_res = stmt.execute(tile_params);
141                if let Err(e) = insert_res {
142                    warn!("insert_res: {:?}", e);
143                } else {
144                    self.stats.count += 1;
145                    self.stats.nbytes += tile_data.len();
146                    debug!(
147                        "count: {}, nbytes: {}",
148                        self.stats.count, self.stats.nbytes
149                    );
150                }
151            }
152        }
153        Ok(())
154    }
155
156    pub async fn write_norm(&mut self) -> UtilesResult<()> {
157        let map_stmt_str = match self.on_conflict {
158            InsertStrategy::Ignore => {
159                "INSERT OR IGNORE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
160            }
161            InsertStrategy::Replace => {
162                "INSERT OR REPLACE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
163            }
164            InsertStrategy::Abort => {
165                "INSERT OR ABORT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
166            }
167            InsertStrategy::Rollback => {
168                "INSERT OR ROLLBACK INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
169            }
170            _ => {
171                "INSERT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
172            }
173        };
174        let mut map_stmt = self.mbt.conn.prepare(map_stmt_str)?;
175        let mut blob_stmt = self.mbt.conn.prepare(
176            "INSERT OR IGNORE INTO images (tile_id, tile_data) VALUES (?1, ?2);",
177        )?;
178        while let Some(value) = self.stream.next().await {
179            match value {
180                MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
181                    let hash_hex =
182                        hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
183                    let map_insert_res =
184                        rusqlite::params![tile.z, tile.x, tile.yup(), hash_hex];
185                    let map_insert_res = map_stmt.execute(map_insert_res);
186                    if let Err(e) = map_insert_res {
187                        warn!("insert_res: {:?}", e);
188                    } else {
189                        self.stats.count += 1;
190                        self.stats.nbytes += tile_data.len();
191                        debug!(
192                            "count: {}, nbytes: {}",
193                            self.stats.count, self.stats.nbytes
194                        );
195                    }
196                    let blob_params = rusqlite::params![hash_hex, tile_data];
197                    let insert_res = blob_stmt.execute(blob_params);
198                    if let Err(e) = insert_res {
199                        warn!("blob insert res: {:?}", e);
200                    } else {
201                        self.stats.count += 1;
202                        self.stats.nbytes += tile_data.len();
203                        debug!(
204                            "count: {}, nbytes: {}",
205                            self.stats.count, self.stats.nbytes
206                        );
207                    }
208                }
209                MbtWriterStreamData::Metadata(_key, _value) => {
210                    warn!("Writing metadata not yet supported");
211                }
212            }
213        }
214        Ok(())
215    }
216
217    pub async fn write(&mut self) -> UtilesResult<()> {
218        let db_type = self.mbt.query_mbt_type()?;
219        self.preflight()?;
220        match db_type {
221            MbtType::Flat => self.write_flat().await,
222            MbtType::Hash => self.write_hash().await,
223            MbtType::Norm => self.write_norm().await,
224            _ => Err(UtilesError::Unsupported(
225                "stream write for unknown db type".to_string(),
226            )),
227        }?;
228        self.postflight()?;
229        Ok(())
230    }
231
232    pub async fn write_batched(&mut self) -> UtilesResult<()> {
233        self.preflight()?;
234        let mut batch = vec![];
235        while let Some(value) = self.stream.next().await {
236            match value {
237                MbtWriterStreamData::Metadata(_key, _value) => {
238                    warn!("Writing metadata not yet supported");
239                }
240                MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
241                    let hash_hex =
242                        hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
243                    batch.push((tile, tile_data, hash_hex));
244                    if batch.len() >= 100 {
245                        let placeholders = batch
246                            .iter()
247                            .map(|_| "(?, ?, ?, ?, ?)")
248                            .collect::<Vec<_>>()
249                            .join(", ");
250                        let mut stmt = self.mbt.conn.prepare_cached(
251                            &format!("INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES {placeholders};"),
252                        )?;
253                        let param_values: Vec<Value> = batch
254                            .iter()
255                            .flat_map(|(tile, tile_data, hash_hex)| {
256                                vec![
257                                    Value::Integer(i64::from(tile.z())),
258                                    Value::Integer(i64::from(tile.x())),
259                                    Value::Integer(i64::from(tile.yup())),
260                                    Value::Blob(tile_data.clone()),
261                                    Value::Text(hash_hex.clone()),
262                                ]
263                            })
264                            .collect();
265                        let insert_res =
266                            stmt.execute(params_from_iter(param_values.iter()));
267                        batch.clear();
268                        if let Err(e) = insert_res {
269                            warn!("insert_res: {:?}", e);
270                        } else {
271                            debug!(
272                                "count: {}, nbytes: {}",
273                                self.stats.count, self.stats.nbytes
274                            );
275                        }
276                    }
277                }
278            }
279        }
280
281        if !batch.is_empty() {
282            let placeholders = batch
283                .iter()
284                .map(|_| "(?, ?, ?, ?)")
285                .collect::<Vec<_>>()
286                .join(", ");
287            let mut stmt = self.mbt.conn.prepare_cached(
288                &format!("INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES {placeholders};"),
289            )?;
290            let param_values: Vec<Value> = batch
291                .iter()
292                .flat_map(|(tile, tile_data, _)| {
293                    vec![
294                        Value::Integer(i64::from(tile.z())),
295                        Value::Integer(i64::from(tile.x())),
296                        Value::Integer(i64::from(tile.yup())),
297                        Value::Blob(tile_data.clone()),
298                    ]
299                })
300                .collect();
301            let insert_res = stmt.execute(params_from_iter(param_values.iter()));
302            if let Err(e) = insert_res {
303                warn!("insert_res: {:?}", e);
304            } else {
305                debug!("count: {}, nbytes: {}", self.stats.count, self.stats.nbytes);
306            }
307        }
308        self.postflight()?;
309        Ok(())
310    }
311}