1use crate::cli::*;
2use crate::config::TileStoreCfg;
3use crate::filter_params::FilterParams;
4use crate::service::{ServiceError, TileService};
5use crate::store::{s3putfiles, CacheLayout};
6use futures::{prelude::*, stream};
7use indicatif::{ProgressBar, ProgressStyle};
8use log::info;
9use par_stream::prelude::*;
10use std::path::PathBuf;
11use std::sync::Arc;
12use tile_grid::BoundingBox;
13
14fn progress_bar() -> ProgressBar {
15 let progress = ProgressBar::new_spinner();
16 progress.set_style(
17 ProgressStyle::default_spinner()
18 .template("{elapsed_precise} ({per_sec}) {spinner} {pos} {msg}"),
19 );
20 progress
21}
22
23impl TileService {
54 pub async fn seed_by_grid(&self, args: &SeedArgs) -> anyhow::Result<()> {
55 let progress = progress_bar();
56 let progress_main = progress.clone();
57
58 let tileset = self
59 .tileset(&args.tileset)
60 .ok_or(ServiceError::TilesetNotFound(args.tileset.clone()))?
61 .clone();
62 let tileset_arc = Arc::new(tileset.clone());
63 let tms = Arc::new(
64 if let Some(tms_id) = &args.tms {
65 tileset.grid(tms_id)?
66 } else {
67 tileset.default_grid(0)?
68 }
69 .clone(),
70 );
71 let format = *tileset.tile_format();
72
73 let bbox = if let Some(numlist) = &args.extent {
74 let arr: Vec<f64> = numlist
75 .split(',')
76 .map(|v| {
77 v.parse()
78 .expect("Error parsing 'extent' as list of float values")
79 })
80 .collect();
81 if arr.len() != 4 {
82 anyhow::bail!("Invalid extent (minx,miny,maxx,maxy)");
83 }
84 BoundingBox::new(arr[0], arr[1], arr[2], arr[3])
85 } else {
86 tms.xy_bbox()
87 };
88
89 let Some(cache_cfg) = tileset.cache_config() else {
90 return Err(
91 ServiceError::TilesetNotFound("Cache configuration not found".to_string()).into(),
92 );
93 };
94 let tile_writer = Arc::new(tileset.store_writer.clone().unwrap());
95 let tile_store_writer = tileset.store_writer.clone().unwrap();
96 let compression = tile_writer.compression();
97
98 let threads = args.threads.unwrap_or(num_cpus::get());
100
101 let minzoom = args.minzoom.unwrap_or(0);
102 let maxzoom = args.maxzoom.unwrap_or(tms.maxzoom());
103 let griditer = tms.xyz_iterator(&bbox, minzoom, maxzoom);
104 info!("Seeding tiles from level {minzoom} to {maxzoom}");
105
106 let iter = griditer.map(move |xyz| {
112 let path = CacheLayout::Zxy.path_string(&PathBuf::new(), &xyz, &format);
113 progress.set_message(path.clone());
114 progress.inc(1);
115 xyz
116 });
117 let par_stream = stream::iter(iter).par_then(threads, move |xyz| {
118 let tileset = tileset_arc.clone();
119 let tms = tms.clone(); let filter = FilterParams::default();
121 let compression = compression.clone();
122 async move {
123 let tile = tileset
124 .read_tile(&tms, &xyz, &filter, &format, compression)
125 .await
126 .unwrap();
127 (xyz, tile)
128 }
129 });
130
131 match cache_cfg {
132 TileStoreCfg::Files(_cfg) => {
133 par_stream
134 .par_then(threads, move |(xyz, tile)| {
135 let tile_writer = tile_writer.clone();
136 async move {
137 let _ = tile_writer.put_tile(&xyz, tile).await;
138 }
139 })
140 .count()
141 .await;
142 }
143 TileStoreCfg::S3(cfg) => {
144 info!("Writing tiles to {}", &cfg.path);
145 let s3_writer_thread_count = args.tasks.unwrap_or(256);
146 par_stream
147 .par_then(s3_writer_thread_count, move |(xyz, tile)| {
148 let s3_writer = tile_writer.clone();
149 async move {
150 let _ = s3_writer.put_tile(&xyz, tile).await;
151 }
152 })
153 .count()
154 .await;
155 }
156 TileStoreCfg::Mbtiles(_) | TileStoreCfg::Pmtiles(_) => {
157 let tile_writer = tile_store_writer.clone();
158 let batch_size = 200; par_stream
160 .stateful_batching(tile_writer, |mut tile_writer, mut stream| async move {
161 let mut batch = Vec::with_capacity(batch_size);
162 while let Some((xyz, tile)) = stream.next().await {
163 batch.push((xyz.z, xyz.x as u32, xyz.y as u32, tile));
164 if batch.len() >= batch.capacity() {
167 break;
168 }
169 }
170 let empty = batch.is_empty();
171 let _ = tile_writer.put_tiles(&batch).await;
172 if empty {
173 let _ = tile_writer.finalize();
174 }
175 (!empty).then_some(((), tile_writer, stream))
176 })
177 .count()
178 .await;
179 }
180 TileStoreCfg::NoStore => {
181 par_stream.count().await;
182 }
183 };
184
185 progress_main.set_style(
186 ProgressStyle::default_spinner().template("{elapsed_precise} ({per_sec}) {msg}"),
187 );
188 let cnt = progress_main.position() + 1;
189 let elapsed = progress_main.elapsed().as_millis() as f64 / 1000.0;
190 progress_main.finish_with_message(format!("{cnt} tiles generated in {elapsed:.2}s"));
191
192 Ok(())
193 }
194
195 pub async fn upload(&self, args: &UploadArgs) -> anyhow::Result<()> {
196 match args.mode {
197 Mode::Sequential => s3putfiles::put_files_seq(args).await,
198 Mode::Tasks => s3putfiles::put_files_tasks(args).await,
199 Mode::Channels => s3putfiles::put_files_channels(args).await,
200 }
201 }
202}