bbox_tile_server/
seed.rs

1use crate::cli::*;
2use crate::config::TileStoreCfg;
3use crate::filter_params::FilterParams;
4use crate::service::{ServiceError, TileService};
5use crate::store::{s3putfiles, CacheLayout};
6use futures::{prelude::*, stream};
7use indicatif::{ProgressBar, ProgressStyle};
8use log::info;
9use par_stream::prelude::*;
10use std::path::PathBuf;
11use std::sync::Arc;
12use tile_grid::BoundingBox;
13
14fn progress_bar() -> ProgressBar {
15    let progress = ProgressBar::new_spinner();
16    progress.set_style(
17        ProgressStyle::default_spinner()
18            .template("{elapsed_precise} ({per_sec}) {spinner} {pos} {msg}"),
19    );
20    progress
21}
22
23/*
24
25# Tile seeder workflows
26
27By-Grid (Raster):
28* Iterate over grid with filters
29* Request tile data
30* Store tile
31File upload:
32* Iterate over files in directory
33* Read file
34* Put file
35
36By-Grid (Vector):
37* Iterate over grid with filters
38* Request tile data
39* Clip data
40* Generalize data
41* Generate tile
42* Store tile
43
44By-Feature (https://github.com/onthegomap/planetiler/blob/main/ARCHITECTURE.md):
45* Iterate over features with filters
46* Generalize for zoom levels
47* Collect data into grid tiles
48* Generate tile
49* Store tile
50
51*/
52
53impl TileService {
54    pub async fn seed_by_grid(&self, args: &SeedArgs) -> anyhow::Result<()> {
55        let progress = progress_bar();
56        let progress_main = progress.clone();
57
58        let tileset = self
59            .tileset(&args.tileset)
60            .ok_or(ServiceError::TilesetNotFound(args.tileset.clone()))?
61            .clone();
62        let tileset_arc = Arc::new(tileset.clone());
63        let tms = Arc::new(
64            if let Some(tms_id) = &args.tms {
65                tileset.grid(tms_id)?
66            } else {
67                tileset.default_grid(0)?
68            }
69            .clone(),
70        );
71        let format = *tileset.tile_format();
72
73        let bbox = if let Some(numlist) = &args.extent {
74            let arr: Vec<f64> = numlist
75                .split(',')
76                .map(|v| {
77                    v.parse()
78                        .expect("Error parsing 'extent' as list of float values")
79                })
80                .collect();
81            if arr.len() != 4 {
82                anyhow::bail!("Invalid extent (minx,miny,maxx,maxy)");
83            }
84            BoundingBox::new(arr[0], arr[1], arr[2], arr[3])
85        } else {
86            tms.xy_bbox()
87        };
88
89        let Some(cache_cfg) = tileset.cache_config() else {
90            return Err(
91                ServiceError::TilesetNotFound("Cache configuration not found".to_string()).into(),
92            );
93        };
94        let tile_writer = Arc::new(tileset.store_writer.clone().unwrap());
95        let tile_store_writer = tileset.store_writer.clone().unwrap();
96        let compression = tile_writer.compression();
97
98        // Number of worker threads (size >= #cores).
99        let threads = args.threads.unwrap_or(num_cpus::get());
100
101        let minzoom = args.minzoom.unwrap_or(0);
102        let maxzoom = args.maxzoom.unwrap_or(tms.maxzoom());
103        let griditer = tms.xyz_iterator(&bbox, minzoom, maxzoom);
104        info!("Seeding tiles from level {minzoom} to {maxzoom}");
105
106        // We setup different pipelines for certain scenarios.
107        // Examples:
108        // map service source -> tile store writer
109        // map service source -> batch collector -> mbtiles store writer
110
111        let iter = griditer.map(move |xyz| {
112            let path = CacheLayout::Zxy.path_string(&PathBuf::new(), &xyz, &format);
113            progress.set_message(path.clone());
114            progress.inc(1);
115            xyz
116        });
117        let par_stream = stream::iter(iter).par_then(threads, move |xyz| {
118            let tileset = tileset_arc.clone();
119            let tms = tms.clone(); // TODO: tileset.default_grid(xyz.z)
120            let filter = FilterParams::default();
121            let compression = compression.clone();
122            async move {
123                let tile = tileset
124                    .read_tile(&tms, &xyz, &filter, &format, compression)
125                    .await
126                    .unwrap();
127                (xyz, tile)
128            }
129        });
130
131        match cache_cfg {
132            TileStoreCfg::Files(_cfg) => {
133                par_stream
134                    .par_then(threads, move |(xyz, tile)| {
135                        let tile_writer = tile_writer.clone();
136                        async move {
137                            let _ = tile_writer.put_tile(&xyz, tile).await;
138                        }
139                    })
140                    .count()
141                    .await;
142            }
143            TileStoreCfg::S3(cfg) => {
144                info!("Writing tiles to {}", &cfg.path);
145                let s3_writer_thread_count = args.tasks.unwrap_or(256);
146                par_stream
147                    .par_then(s3_writer_thread_count, move |(xyz, tile)| {
148                        let s3_writer = tile_writer.clone();
149                        async move {
150                            let _ = s3_writer.put_tile(&xyz, tile).await;
151                        }
152                    })
153                    .count()
154                    .await;
155            }
156            TileStoreCfg::Mbtiles(_) | TileStoreCfg::Pmtiles(_) => {
157                let tile_writer = tile_store_writer.clone();
158                let batch_size = 200; // For MBTiles, create the largest prepared statement supported by SQLite (999 parameters)
159                par_stream
160                    .stateful_batching(tile_writer, |mut tile_writer, mut stream| async move {
161                        let mut batch = Vec::with_capacity(batch_size);
162                        while let Some((xyz, tile)) = stream.next().await {
163                            batch.push((xyz.z, xyz.x as u32, xyz.y as u32, tile));
164                            // let _ = tile_writer.put_tile_mut(&xyz, tile).await;
165                            // batch.push((xyz.z, xyz.x as u32, xyz.y as u32, Vec::<u8>::new()));
166                            if batch.len() >= batch.capacity() {
167                                break;
168                            }
169                        }
170                        let empty = batch.is_empty();
171                        let _ = tile_writer.put_tiles(&batch).await;
172                        if empty {
173                            let _ = tile_writer.finalize();
174                        }
175                        (!empty).then_some(((), tile_writer, stream))
176                    })
177                    .count()
178                    .await;
179            }
180            TileStoreCfg::NoStore => {
181                par_stream.count().await;
182            }
183        };
184
185        progress_main.set_style(
186            ProgressStyle::default_spinner().template("{elapsed_precise} ({per_sec}) {msg}"),
187        );
188        let cnt = progress_main.position() + 1;
189        let elapsed = progress_main.elapsed().as_millis() as f64 / 1000.0;
190        progress_main.finish_with_message(format!("{cnt} tiles generated in {elapsed:.2}s"));
191
192        Ok(())
193    }
194
195    pub async fn upload(&self, args: &UploadArgs) -> anyhow::Result<()> {
196        match args.mode {
197            Mode::Sequential => s3putfiles::put_files_seq(args).await,
198            Mode::Tasks => s3putfiles::put_files_tasks(args).await,
199            Mode::Channels => s3putfiles::put_files_channels(args).await,
200        }
201    }
202}