archiver_core/etl/
executor.rs1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::time::interval;
7use tracing::{debug, error, info, warn};
8
9const DEFAULT_MOVE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
12
13use crate::registry::{PvRegistry, PvStatus};
14use crate::storage::plainpb::PlainPbStoragePlugin;
15use crate::storage::plainpb::reader::PbFileReader;
16use crate::storage::traits::{EventStream, StoragePlugin};
17
18pub struct EtlExecutor {
20 source: Arc<PlainPbStoragePlugin>,
21 dest: Arc<PlainPbStoragePlugin>,
22 period_secs: u64,
24 hold: u32,
26 gather: u32,
28 move_timeout: Duration,
30 pv_registry: Option<Arc<PvRegistry>>,
33}
34
35impl EtlExecutor {
36 pub fn new(
37 source: Arc<PlainPbStoragePlugin>,
38 dest: Arc<PlainPbStoragePlugin>,
39 period_secs: u64,
40 hold: u32,
41 gather: u32,
42 ) -> Self {
43 Self {
44 source,
45 dest,
46 period_secs,
47 hold,
48 gather,
49 move_timeout: DEFAULT_MOVE_TIMEOUT,
50 pv_registry: None,
51 }
52 }
53
54 pub fn with_pv_registry(mut self, registry: Arc<PvRegistry>) -> Self {
60 self.pv_registry = Some(registry);
61 self
62 }
63
64 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
66 let mut tick = interval(Duration::from_secs(self.period_secs));
67 info!(
68 source = self.source.name(),
69 dest = self.dest.name(),
70 "ETL executor started"
71 );
72
73 loop {
74 tokio::select! {
75 _ = tick.tick() => {
76 if let Err(e) = self.run_once().await {
77 error!("ETL error: {e}");
78 }
79 }
80 _ = shutdown.changed() => {
81 info!("ETL executor shutting down");
82 break;
83 }
84 }
85 }
86 }
87
88 async fn run_once(&self) -> anyhow::Result<()> {
91 if crate::flags::skip_tier_for_etl(self.dest.name()) {
95 debug!(
96 dest = self.dest.name(),
97 "ETL skipped: SKIP_<DEST>_FOR_ETL flag set"
98 );
99 return Ok(());
100 }
101
102 let source_root = self.source.root_folder();
103 if !source_root.exists() {
104 return Ok(());
105 }
106
107 let mut pb_files = list_pb_files(source_root)?;
108 pb_files.sort();
109
110 if pb_files.len() <= self.hold as usize {
111 debug!(
112 count = pb_files.len(),
113 hold = self.hold,
114 "Not enough partitions to trigger ETL"
115 );
116 return Ok(());
117 }
118
119 let files_to_move = pb_files
120 .len()
121 .saturating_sub(self.hold as usize)
122 .min(self.gather as usize);
123
124 let files_subset: Vec<PathBuf> = pb_files.into_iter().take(files_to_move).collect();
126 let grouped = group_files_by_pv(&files_subset);
127
128 let paused_keys: HashSet<String> = match self.pv_registry.as_ref() {
132 Some(reg) => reg
133 .pvs_by_status(PvStatus::Paused)
134 .map(|recs| {
135 recs.into_iter()
136 .map(|r| crate::storage::plainpb::pv_name_to_key(&r.pv_name))
137 .collect()
138 })
139 .unwrap_or_else(|e| {
140 warn!("ETL: failed to read paused PVs from registry: {e}");
141 HashSet::new()
142 }),
143 None => HashSet::new(),
144 };
145
146 for (pv_key, files) in &grouped {
147 if paused_keys.contains(pv_key) {
148 debug!(pv = pv_key, "ETL skipping paused PV");
149 continue;
150 }
151 debug!(pv = pv_key, count = files.len(), "ETL processing PV group");
152 for file in files {
153 info!(?file, dest = self.dest.name(), "ETL moving file");
154 if let Err(e) = self.move_file(file).await {
155 warn!(?file, "ETL failed to move file: {e}");
156 }
157 }
158 }
159
160 Ok(())
161 }
162
163 pub fn source_name(&self) -> &str {
164 self.source.name()
165 }
166
167 pub fn dest_name(&self) -> &str {
168 self.dest.name()
169 }
170
171 pub async fn consolidate_pv(&self, pv: &str) -> anyhow::Result<u64> {
178 self.source.flush_writes().await?;
181
182 let pv_files =
183 crate::storage::plainpb::list_pv_pb_files_pub(self.source.root_folder(), pv)?;
184 let total = pv_files.len() as u64;
185 info!(
186 pv,
187 total,
188 source = self.source.name(),
189 dest = self.dest.name(),
190 "Consolidating PV files",
191 );
192 for file in &pv_files {
193 if let Err(e) = self.move_file(file).await {
194 warn!(?file, "consolidate_pv: failed to move file: {e}");
195 return Err(e);
196 }
197 }
198 Ok(total)
199 }
200
201 async fn move_file(&self, source_path: &Path) -> anyhow::Result<()> {
204 let marker = source_path.with_extension("pb.etl_done");
206 if marker.exists() {
207 info!(
208 ?source_path,
209 "Found ETL marker — previous copy completed, cleaning up"
210 );
211 if let Err(e) = tokio::fs::remove_file(source_path).await {
212 warn!(
213 ?source_path,
214 "Failed to remove source after ETL marker found: {e}"
215 );
216 }
217 let source = self.source.clone();
225 let path_for_evict = source_path.to_path_buf();
226 let _ =
227 tokio::task::spawn_blocking(move || source.evict_writer_for_path(&path_for_evict))
228 .await;
229 if let Err(e) = tokio::fs::remove_file(&marker).await {
230 warn!(?marker, "Failed to remove ETL marker: {e}");
231 }
232 return Ok(());
233 }
234
235 let timeout = self.move_timeout;
238 let source_path = source_path.to_path_buf();
239 let source_path_disp = source_path.clone();
240 let dest = self.dest.clone();
241 let source = self.source.clone();
242 let source_name = self.source.name().to_string();
243 let dest_name = self.dest.name().to_string();
244
245 tokio::time::timeout(timeout, async move {
246 let mut reader = PbFileReader::open(&source_path)?;
247 let desc = reader.description().clone();
248 let dbr_type = desc.db_type;
249
250 while let Some(sample) = reader.next_event()? {
252 dest.append_event(&desc.pv_name, dbr_type, &sample).await?;
253 }
254
255 let marker = source_path.with_extension("pb.etl_done");
258 tokio::fs::write(&marker, b"").await?;
259 tokio::fs::remove_file(&source_path).await?;
260 let source_for_evict = source.clone();
265 let path_for_evict = source_path.clone();
266 let _ = tokio::task::spawn_blocking(move || {
267 source_for_evict.evict_writer_for_path(&path_for_evict)
268 })
269 .await;
270 tokio::fs::remove_file(&marker).await.ok();
271 metrics::counter!(
272 "archiver_etl_files_moved_total",
273 "source" => source_name,
274 "dest" => dest_name,
275 )
276 .increment(1);
277 anyhow::Ok(())
278 })
279 .await
280 .map_err(|_| {
281 anyhow::anyhow!("ETL move_file timed out after {timeout:?} for {source_path_disp:?}")
282 })??;
283
284 Ok(())
285 }
286}
287
288fn group_files_by_pv(files: &[PathBuf]) -> HashMap<String, Vec<&PathBuf>> {
291 let mut groups: HashMap<String, Vec<&PathBuf>> = HashMap::new();
292 for file in files {
293 let pv_key = extract_pv_key(file);
294 groups.entry(pv_key).or_default().push(file);
295 }
296 groups
297}
298
299fn extract_pv_key(path: &Path) -> String {
302 if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
303 let stem = file_name.strip_suffix(".pb").unwrap_or(file_name);
305 if let Some(colon_pos) = stem.find(':') {
306 let leaf = &stem[..colon_pos];
307 if let Some(parent) = path
309 .parent()
310 .and_then(|p| p.file_name())
311 .and_then(|n| n.to_str())
312 {
313 return format!("{parent}/{leaf}");
314 }
315 return leaf.to_string();
316 }
317 }
318 path.to_string_lossy().to_string()
319}
320
321fn list_pb_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
323 let mut files = Vec::new();
324 if dir.is_dir() {
325 for entry in std::fs::read_dir(dir)? {
326 let entry = entry?;
327 let path = entry.path();
328 if path.is_dir() {
329 files.extend(list_pb_files(&path)?);
330 } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
331 files.push(path);
332 }
333 }
334 }
335 Ok(files)
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_extract_pv_key() {
344 let path = PathBuf::from("/data/sts/SIM/Sine:2024_03_15_09.pb");
345 assert_eq!(extract_pv_key(&path), "SIM/Sine");
346 }
347
348 #[test]
349 fn test_group_files_by_pv() {
350 let files = vec![
351 PathBuf::from("/data/SIM/Sine:2024_03_01.pb"),
352 PathBuf::from("/data/SIM/Sine:2024_03_02.pb"),
353 PathBuf::from("/data/SIM/Cosine:2024_03_01.pb"),
354 ];
355 let grouped = group_files_by_pv(&files);
356 assert_eq!(grouped.len(), 2);
357 assert_eq!(grouped["SIM/Sine"].len(), 2);
358 assert_eq!(grouped["SIM/Cosine"].len(), 1);
359 }
360}