archiver_core/etl/
executor.rs1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::time::interval;
7use tracing::{debug, error, info, warn};
8
9const DEFAULT_MOVE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
12
13use crate::registry::{PvRegistry, PvStatus};
14use crate::storage::plainpb::PlainPbStoragePlugin;
15use crate::storage::plainpb::reader::PbFileReader;
16use crate::storage::traits::{EventStream, StoragePlugin};
17
18pub struct EtlExecutor {
20 source: Arc<PlainPbStoragePlugin>,
21 dest: Arc<PlainPbStoragePlugin>,
22 period_secs: u64,
24 hold: u32,
26 gather: u32,
28 move_timeout: Duration,
30 pv_registry: Option<Arc<PvRegistry>>,
33}
34
35impl EtlExecutor {
36 pub fn new(
37 source: Arc<PlainPbStoragePlugin>,
38 dest: Arc<PlainPbStoragePlugin>,
39 period_secs: u64,
40 hold: u32,
41 gather: u32,
42 ) -> Self {
43 Self {
44 source,
45 dest,
46 period_secs,
47 hold,
48 gather,
49 move_timeout: DEFAULT_MOVE_TIMEOUT,
50 pv_registry: None,
51 }
52 }
53
54 pub fn with_pv_registry(mut self, registry: Arc<PvRegistry>) -> Self {
60 self.pv_registry = Some(registry);
61 self
62 }
63
64 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
66 let mut tick = interval(Duration::from_secs(self.period_secs));
67 info!(
68 source = self.source.name(),
69 dest = self.dest.name(),
70 "ETL executor started"
71 );
72
73 loop {
74 tokio::select! {
75 _ = tick.tick() => {
76 if let Err(e) = self.run_once().await {
77 error!("ETL error: {e}");
78 }
79 }
80 _ = shutdown.changed() => {
81 info!("ETL executor shutting down");
82 break;
83 }
84 }
85 }
86 }
87
88 async fn run_once(&self) -> anyhow::Result<()> {
91 if crate::flags::skip_tier_for_etl(self.dest.name()) {
95 debug!(
96 dest = self.dest.name(),
97 "ETL skipped: SKIP_<DEST>_FOR_ETL flag set"
98 );
99 return Ok(());
100 }
101
102 let source_root = self.source.root_folder();
103 if !source_root.exists() {
104 return Ok(());
105 }
106
107 let mut pb_files = list_pb_files(source_root)?;
108 pb_files.sort();
109
110 if pb_files.len() <= self.hold as usize {
111 debug!(
112 count = pb_files.len(),
113 hold = self.hold,
114 "Not enough partitions to trigger ETL"
115 );
116 return Ok(());
117 }
118
119 let files_to_move = pb_files
120 .len()
121 .saturating_sub(self.hold as usize)
122 .min(self.gather as usize);
123
124 let files_subset: Vec<PathBuf> = pb_files.into_iter().take(files_to_move).collect();
126 let grouped = group_files_by_pv(&files_subset);
127
128 let paused_keys: HashSet<String> = match self.pv_registry.as_ref() {
132 Some(reg) => reg
133 .pvs_by_status(PvStatus::Paused)
134 .map(|recs| {
135 recs.into_iter()
136 .map(|r| crate::storage::plainpb::pv_name_to_key(&r.pv_name))
137 .collect()
138 })
139 .unwrap_or_else(|e| {
140 warn!("ETL: failed to read paused PVs from registry: {e}");
141 HashSet::new()
142 }),
143 None => HashSet::new(),
144 };
145
146 for (pv_key, files) in &grouped {
147 if paused_keys.contains(pv_key) {
148 debug!(pv = pv_key, "ETL skipping paused PV");
149 continue;
150 }
151 debug!(pv = pv_key, count = files.len(), "ETL processing PV group");
152 for file in files {
153 info!(?file, dest = self.dest.name(), "ETL moving file");
154 if let Err(e) = self.move_file(file).await {
155 warn!(?file, "ETL failed to move file: {e}");
156 }
157 }
158 }
159
160 Ok(())
161 }
162
163 pub fn source_name(&self) -> &str {
164 self.source.name()
165 }
166
167 pub fn dest_name(&self) -> &str {
168 self.dest.name()
169 }
170
171 pub async fn consolidate_pv(&self, pv: &str) -> anyhow::Result<u64> {
178 self.source.flush_writes().await?;
181
182 let pv_files =
183 crate::storage::plainpb::list_pv_pb_files_pub(self.source.root_folder(), pv)?;
184 let total = pv_files.len() as u64;
185 info!(
186 pv,
187 total,
188 source = self.source.name(),
189 dest = self.dest.name(),
190 "Consolidating PV files",
191 );
192 for file in &pv_files {
193 if let Err(e) = self.move_file(file).await {
194 warn!(?file, "consolidate_pv: failed to move file: {e}");
195 return Err(e);
196 }
197 }
198 Ok(total)
199 }
200
201 async fn move_file(&self, source_path: &Path) -> anyhow::Result<()> {
204 let marker = source_path.with_extension("pb.etl_done");
206 if marker.exists() {
207 info!(
208 ?source_path,
209 "Found ETL marker — previous copy completed, cleaning up"
210 );
211 if let Err(e) = tokio::fs::remove_file(source_path).await {
212 warn!(
213 ?source_path,
214 "Failed to remove source after ETL marker found: {e}"
215 );
216 }
217 if let Err(e) = tokio::fs::remove_file(&marker).await {
218 warn!(?marker, "Failed to remove ETL marker: {e}");
219 }
220 return Ok(());
221 }
222
223 let timeout = self.move_timeout;
226 let source_path = source_path.to_path_buf();
227 let source_path_disp = source_path.clone();
228 let dest = self.dest.clone();
229 let source_name = self.source.name().to_string();
230 let dest_name = self.dest.name().to_string();
231
232 tokio::time::timeout(timeout, async move {
233 let mut reader = PbFileReader::open(&source_path)?;
234 let desc = reader.description().clone();
235 let dbr_type = desc.db_type;
236
237 while let Some(sample) = reader.next_event()? {
239 dest.append_event(&desc.pv_name, dbr_type, &sample).await?;
240 }
241
242 let marker = source_path.with_extension("pb.etl_done");
245 tokio::fs::write(&marker, b"").await?;
246 tokio::fs::remove_file(&source_path).await?;
247 tokio::fs::remove_file(&marker).await.ok();
248 metrics::counter!(
249 "archiver_etl_files_moved_total",
250 "source" => source_name,
251 "dest" => dest_name,
252 )
253 .increment(1);
254 anyhow::Ok(())
255 })
256 .await
257 .map_err(|_| {
258 anyhow::anyhow!("ETL move_file timed out after {timeout:?} for {source_path_disp:?}")
259 })??;
260
261 Ok(())
262 }
263}
264
265fn group_files_by_pv(files: &[PathBuf]) -> HashMap<String, Vec<&PathBuf>> {
268 let mut groups: HashMap<String, Vec<&PathBuf>> = HashMap::new();
269 for file in files {
270 let pv_key = extract_pv_key(file);
271 groups.entry(pv_key).or_default().push(file);
272 }
273 groups
274}
275
276fn extract_pv_key(path: &Path) -> String {
279 if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
280 let stem = file_name.strip_suffix(".pb").unwrap_or(file_name);
282 if let Some(colon_pos) = stem.find(':') {
283 let leaf = &stem[..colon_pos];
284 if let Some(parent) = path
286 .parent()
287 .and_then(|p| p.file_name())
288 .and_then(|n| n.to_str())
289 {
290 return format!("{parent}/{leaf}");
291 }
292 return leaf.to_string();
293 }
294 }
295 path.to_string_lossy().to_string()
296}
297
298fn list_pb_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
300 let mut files = Vec::new();
301 if dir.is_dir() {
302 for entry in std::fs::read_dir(dir)? {
303 let entry = entry?;
304 let path = entry.path();
305 if path.is_dir() {
306 files.extend(list_pb_files(&path)?);
307 } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
308 files.push(path);
309 }
310 }
311 }
312 Ok(files)
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn test_extract_pv_key() {
321 let path = PathBuf::from("/data/sts/SIM/Sine:2024_03_15_09.pb");
322 assert_eq!(extract_pv_key(&path), "SIM/Sine");
323 }
324
325 #[test]
326 fn test_group_files_by_pv() {
327 let files = vec![
328 PathBuf::from("/data/SIM/Sine:2024_03_01.pb"),
329 PathBuf::from("/data/SIM/Sine:2024_03_02.pb"),
330 PathBuf::from("/data/SIM/Cosine:2024_03_01.pb"),
331 ];
332 let grouped = group_files_by_pv(&files);
333 assert_eq!(grouped.len(), 2);
334 assert_eq!(grouped["SIM/Sine"].len(), 2);
335 assert_eq!(grouped["SIM/Cosine"].len(), 1);
336 }
337}