utiles 0.8.0

Web map tile utils (aka utiles)
Documentation
use futures::StreamExt;
use rusqlite::params_from_iter;
use rusqlite::types::Value;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, warn};

use utiles_core::{Tile, TileLike};

use crate::hash::xxh64_be_hex_upper;
use crate::mbt::{MbtType, Mbtiles};
use crate::sqlite::InsertStrategy;
use crate::{UtilesError, UtilesResult};

pub enum MbtWriterStreamData {
    Tile(Tile, Vec<u8>, Option<String>),
    Metadata(String, String),
}

impl From<(Tile, Vec<u8>, Option<String>)> for MbtWriterStreamData {
    fn from(data: (Tile, Vec<u8>, Option<String>)) -> Self {
        Self::Tile(data.0, data.1, data.2)
    }
}

#[derive(Default)]
pub struct MbtWriterStats {
    pub count: usize,
    pub nbytes: usize,
}

pub struct MbtStreamWriterSync {
    pub stream: ReceiverStream<MbtWriterStreamData>,
    pub mbt: Mbtiles,
    pub on_conflict: InsertStrategy,
    pub stats: MbtWriterStats,
}

impl MbtStreamWriterSync {
    pub fn preflight(&self) -> UtilesResult<()> {
        self.mbt
            .conn
            .execute_batch(
                r"
            PRAGMA synchronous = OFF;
            PRAGMA journal_mode = WAL;
            PRAGMA locking_mode = EXCLUSIVE;
            PRAGMA temp_store = MEMORY;
            PRAGMA cache_size = 100000;
            ",
            )
            .map_err(Into::into)
    }

    pub fn postflight(&self) -> UtilesResult<()> {
        self.mbt
            .conn
            .execute_batch(
                r"
            PRAGMA synchronous = NORMAL;
            PRAGMA journal_mode = DELETE;
            PRAGMA locking_mode = NORMAL;
            PRAGMA temp_store = DEFAULT;
            PRAGMA cache_size = 2000;
            ",
            )
            .map_err(Into::into)
    }

    pub async fn write_flat(&mut self) -> UtilesResult<()> {
        let stmt_str = match self.on_conflict {
            InsertStrategy::Ignore => {
                "INSERT OR IGNORE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Replace => {
                "INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Abort => {
                "INSERT OR ABORT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Rollback => {
                "INSERT OR ROLLBACK INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
            }
            _ => {
                "INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?1, ?2, ?3, ?4);"
            }
        };
        let mut stmt = self.mbt.conn.prepare(stmt_str)?;
        let stream = &mut self.stream;
        while let Some(value) = stream.next().await {
            match value {
                MbtWriterStreamData::Metadata(_key, _value) => {
                    warn!("Writing metadata not yet supported");
                }
                MbtWriterStreamData::Tile(tile, tile_data, _) => {
                    let tile_params =
                        rusqlite::params![tile.z, tile.x, tile.yup(), tile_data];
                    let insert_res = stmt.execute(tile_params);
                    if let Err(e) = insert_res {
                        warn!("insert_res: {:?}", e);
                    } else {
                        self.stats.count += 1;
                        self.stats.nbytes += tile_data.len();
                        debug!(
                            "count: {}, nbytes: {}",
                            self.stats.count, self.stats.nbytes
                        );
                    }
                }
            }
        }
        Ok(())
    }

    pub async fn write_hash(&mut self) -> UtilesResult<()> {
        let stmt_str = match self.on_conflict {
            InsertStrategy::Ignore => {
                "INSERT OR IGNORE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
            }
            InsertStrategy::Replace => {
                "INSERT OR REPLACE INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
            }
            InsertStrategy::Abort => {
                "INSERT OR ABORT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
            }
            InsertStrategy::Rollback => {
                "INSERT OR ROLLBACK INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
            }
            _ => {
                "INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES (?1, ?2, ?3, ?4, ?5);"
            }
        };
        let mut stmt = self.mbt.conn.prepare(stmt_str)?;
        let stream = &mut self.stream;
        while let Some(value) = stream.next().await {
            if let MbtWriterStreamData::Tile(tile, tile_data, hash_hex) = value {
                let hash_hex =
                    hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
                let tile_params =
                    rusqlite::params![tile.z, tile.x, tile.yup(), tile_data, hash_hex];
                let insert_res = stmt.execute(tile_params);
                if let Err(e) = insert_res {
                    warn!("insert_res: {:?}", e);
                } else {
                    self.stats.count += 1;
                    self.stats.nbytes += tile_data.len();
                    debug!(
                        "count: {}, nbytes: {}",
                        self.stats.count, self.stats.nbytes
                    );
                }
            }
        }
        Ok(())
    }

    pub async fn write_norm(&mut self) -> UtilesResult<()> {
        let map_stmt_str = match self.on_conflict {
            InsertStrategy::Ignore => {
                "INSERT OR IGNORE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Replace => {
                "INSERT OR REPLACE INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Abort => {
                "INSERT OR ABORT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
            }
            InsertStrategy::Rollback => {
                "INSERT OR ROLLBACK INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
            }
            _ => {
                "INSERT INTO map (zoom_level, tile_column, tile_row, tile_id) VALUES (?1, ?2, ?3, ?4);"
            }
        };
        let mut map_stmt = self.mbt.conn.prepare(map_stmt_str)?;
        let mut blob_stmt = self.mbt.conn.prepare(
            "INSERT OR IGNORE INTO images (tile_id, tile_data) VALUES (?1, ?2);",
        )?;
        while let Some(value) = self.stream.next().await {
            match value {
                MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
                    let hash_hex =
                        hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
                    let map_insert_res =
                        rusqlite::params![tile.z, tile.x, tile.yup(), hash_hex];
                    let map_insert_res = map_stmt.execute(map_insert_res);
                    if let Err(e) = map_insert_res {
                        warn!("insert_res: {:?}", e);
                    } else {
                        self.stats.count += 1;
                        self.stats.nbytes += tile_data.len();
                        debug!(
                            "count: {}, nbytes: {}",
                            self.stats.count, self.stats.nbytes
                        );
                    }
                    let blob_params = rusqlite::params![hash_hex, tile_data];
                    let insert_res = blob_stmt.execute(blob_params);
                    if let Err(e) = insert_res {
                        warn!("blob insert res: {:?}", e);
                    } else {
                        self.stats.count += 1;
                        self.stats.nbytes += tile_data.len();
                        debug!(
                            "count: {}, nbytes: {}",
                            self.stats.count, self.stats.nbytes
                        );
                    }
                }
                MbtWriterStreamData::Metadata(_key, _value) => {
                    warn!("Writing metadata not yet supported");
                }
            }
        }
        Ok(())
    }

    pub async fn write(&mut self) -> UtilesResult<()> {
        let db_type = self.mbt.query_mbt_type()?;
        self.preflight()?;
        match db_type {
            MbtType::Flat => self.write_flat().await,
            MbtType::Hash => self.write_hash().await,
            MbtType::Norm => self.write_norm().await,
            _ => Err(UtilesError::Unsupported(
                "stream write for unknown db type".to_string(),
            )),
        }?;
        self.postflight()?;
        Ok(())
    }

    pub async fn write_batched(&mut self) -> UtilesResult<()> {
        self.preflight()?;
        let mut batch = vec![];
        while let Some(value) = self.stream.next().await {
            match value {
                MbtWriterStreamData::Metadata(_key, _value) => {
                    warn!("Writing metadata not yet supported");
                }
                MbtWriterStreamData::Tile(tile, tile_data, hash_hex) => {
                    let hash_hex =
                        hash_hex.unwrap_or_else(|| xxh64_be_hex_upper(&tile_data));
                    batch.push((tile, tile_data, hash_hex));
                    if batch.len() >= 100 {
                        let placeholders = batch
                            .iter()
                            .map(|_| "(?, ?, ?, ?, ?)")
                            .collect::<Vec<_>>()
                            .join(", ");
                        let mut stmt = self.mbt.conn.prepare_cached(
                            &format!("INSERT INTO tiles_with_hash (zoom_level, tile_column, tile_row, tile_data, tile_hash) VALUES {placeholders};"),
                        )?;
                        let param_values: Vec<Value> = batch
                            .iter()
                            .flat_map(|(tile, tile_data, hash_hex)| {
                                vec![
                                    Value::Integer(i64::from(tile.z())),
                                    Value::Integer(i64::from(tile.x())),
                                    Value::Integer(i64::from(tile.yup())),
                                    Value::Blob(tile_data.clone()),
                                    Value::Text(hash_hex.clone()),
                                ]
                            })
                            .collect();
                        let insert_res =
                            stmt.execute(params_from_iter(param_values.iter()));
                        batch.clear();
                        if let Err(e) = insert_res {
                            warn!("insert_res: {:?}", e);
                        } else {
                            debug!(
                                "count: {}, nbytes: {}",
                                self.stats.count, self.stats.nbytes
                            );
                        }
                    }
                }
            }
        }

        if !batch.is_empty() {
            let placeholders = batch
                .iter()
                .map(|_| "(?, ?, ?, ?)")
                .collect::<Vec<_>>()
                .join(", ");
            let mut stmt = self.mbt.conn.prepare_cached(
                &format!("INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES {placeholders};"),
            )?;
            let param_values: Vec<Value> = batch
                .iter()
                .flat_map(|(tile, tile_data, _)| {
                    vec![
                        Value::Integer(i64::from(tile.z())),
                        Value::Integer(i64::from(tile.x())),
                        Value::Integer(i64::from(tile.yup())),
                        Value::Blob(tile_data.clone()),
                    ]
                })
                .collect();
            let insert_res = stmt.execute(params_from_iter(param_values.iter()));
            if let Err(e) = insert_res {
                warn!("insert_res: {:?}", e);
            } else {
                debug!("count: {}, nbytes: {}", self.stats.count, self.stats.nbytes);
            }
        }
        self.postflight()?;
        Ok(())
    }
}