Skip to main content

archiver_core/etl/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::time::interval;
7use tracing::{debug, error, info, warn};
8
9// Java parity (3daedae): f.get() without a timeout hung indefinitely on
10// slow NFS. Use 24 h as the default, matching Java's chosen bound.
11const DEFAULT_MOVE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
12
13use crate::registry::{PvRegistry, PvStatus};
14use crate::storage::plainpb::PlainPbStoragePlugin;
15use crate::storage::plainpb::reader::PbFileReader;
16use crate::storage::traits::{EventStream, StoragePlugin};
17
18/// ETL executor — periodically moves data from source tier to destination tier.
19pub struct EtlExecutor {
20    source: Arc<PlainPbStoragePlugin>,
21    dest: Arc<PlainPbStoragePlugin>,
22    /// How often to run ETL (seconds).
23    period_secs: u64,
24    /// Number of partitions to hold in source before moving.
25    hold: u32,
26    /// Number of partitions to gather (move out) at once.
27    gather: u32,
28    /// Per-file move timeout (Java parity 3daedae).
29    move_timeout: Duration,
30    /// Optional PV registry — when present, paused PVs are skipped
31    /// (Java parity 92db337).
32    pv_registry: Option<Arc<PvRegistry>>,
33}
34
35impl EtlExecutor {
36    pub fn new(
37        source: Arc<PlainPbStoragePlugin>,
38        dest: Arc<PlainPbStoragePlugin>,
39        period_secs: u64,
40        hold: u32,
41        gather: u32,
42    ) -> Self {
43        Self {
44            source,
45            dest,
46            period_secs,
47            hold,
48            gather,
49            move_timeout: DEFAULT_MOVE_TIMEOUT,
50            pv_registry: None,
51        }
52    }
53
54    /// Wire a PV registry so the executor can skip paused PVs in
55    /// `run_once`. Java parity (92db337): without this, PB files for a
56    /// paused PV continue to migrate out of the STS, which surprises
57    /// operators who expect the data to stay accessible there until the
58    /// PV resumes.
59    pub fn with_pv_registry(mut self, registry: Arc<PvRegistry>) -> Self {
60        self.pv_registry = Some(registry);
61        self
62    }
63
64    /// Run the ETL loop. Call this as a spawned task.
65    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
66        let mut tick = interval(Duration::from_secs(self.period_secs));
67        info!(
68            source = self.source.name(),
69            dest = self.dest.name(),
70            "ETL executor started"
71        );
72
73        loop {
74            tokio::select! {
75                _ = tick.tick() => {
76                    if let Err(e) = self.run_once().await {
77                        error!("ETL error: {e}");
78                    }
79                }
80                _ = shutdown.changed() => {
81                    info!("ETL executor shutting down");
82                    break;
83                }
84            }
85        }
86    }
87
88    /// Execute one round of ETL: find old partition files in source, move to dest.
89    /// Groups files by PV name for coherent transfers.
90    async fn run_once(&self) -> anyhow::Result<()> {
91        // Operator-controlled bypass (Java's SKIP_<NAME>_FOR_ETL named flag,
92        // adc5889a). Set during e.g. an OS migration to pause writes into a
93        // particular tier without restarting the appliance.
94        if crate::flags::skip_tier_for_etl(self.dest.name()) {
95            debug!(
96                dest = self.dest.name(),
97                "ETL skipped: SKIP_<DEST>_FOR_ETL flag set"
98            );
99            return Ok(());
100        }
101
102        let source_root = self.source.root_folder();
103        if !source_root.exists() {
104            return Ok(());
105        }
106
107        let mut pb_files = list_pb_files(source_root)?;
108        pb_files.sort();
109
110        if pb_files.len() <= self.hold as usize {
111            debug!(
112                count = pb_files.len(),
113                hold = self.hold,
114                "Not enough partitions to trigger ETL"
115            );
116            return Ok(());
117        }
118
119        let files_to_move = pb_files
120            .len()
121            .saturating_sub(self.hold as usize)
122            .min(self.gather as usize);
123
124        // Group files by PV name for coherent per-PV transfers.
125        let files_subset: Vec<PathBuf> = pb_files.into_iter().take(files_to_move).collect();
126        let grouped = group_files_by_pv(&files_subset);
127
128        // Java parity (92db337): skip files whose owning PV is paused.
129        // Compute the set of paused-PV file keys once per tick instead of
130        // per-file to keep the registry lookup off the hot path.
131        let paused_keys: HashSet<String> = match self.pv_registry.as_ref() {
132            Some(reg) => reg
133                .pvs_by_status(PvStatus::Paused)
134                .map(|recs| {
135                    recs.into_iter()
136                        .map(|r| crate::storage::plainpb::pv_name_to_key(&r.pv_name))
137                        .collect()
138                })
139                .unwrap_or_else(|e| {
140                    warn!("ETL: failed to read paused PVs from registry: {e}");
141                    HashSet::new()
142                }),
143            None => HashSet::new(),
144        };
145
146        for (pv_key, files) in &grouped {
147            if paused_keys.contains(pv_key) {
148                debug!(pv = pv_key, "ETL skipping paused PV");
149                continue;
150            }
151            debug!(pv = pv_key, count = files.len(), "ETL processing PV group");
152            for file in files {
153                info!(?file, dest = self.dest.name(), "ETL moving file");
154                if let Err(e) = self.move_file(file).await {
155                    warn!(?file, "ETL failed to move file: {e}");
156                }
157            }
158        }
159
160        Ok(())
161    }
162
163    pub fn source_name(&self) -> &str {
164        self.source.name()
165    }
166
167    pub fn dest_name(&self) -> &str {
168        self.dest.name()
169    }
170
171    /// Force-move every PB file the source tier currently holds for `pv`
172    /// to the destination tier, ignoring `hold` / `gather` constraints.
173    /// Drives the `consolidateDataForPV` BPL endpoint.
174    ///
175    /// The same crash-safe move_file is reused, so partial failures leave
176    /// the source either fully migrated or untouched.
177    pub async fn consolidate_pv(&self, pv: &str) -> anyhow::Result<u64> {
178        // Flush any buffered writes for the source tier so we move
179        // everything that's been written so far.
180        self.source.flush_writes().await?;
181
182        let pv_files =
183            crate::storage::plainpb::list_pv_pb_files_pub(self.source.root_folder(), pv)?;
184        let total = pv_files.len() as u64;
185        info!(
186            pv,
187            total,
188            source = self.source.name(),
189            dest = self.dest.name(),
190            "Consolidating PV files",
191        );
192        for file in &pv_files {
193            if let Err(e) = self.move_file(file).await {
194                warn!(?file, "consolidate_pv: failed to move file: {e}");
195                return Err(e);
196            }
197        }
198        Ok(total)
199    }
200
201    /// Move a single PB file from source to destination tier.
202    /// Uses copy → verify → marker → delete for crash-safe idempotency.
203    async fn move_file(&self, source_path: &Path) -> anyhow::Result<()> {
204        // Check for a marker from a previous incomplete cleanup (crash after copy).
205        let marker = source_path.with_extension("pb.etl_done");
206        if marker.exists() {
207            info!(
208                ?source_path,
209                "Found ETL marker — previous copy completed, cleaning up"
210            );
211            if let Err(e) = tokio::fs::remove_file(source_path).await {
212                warn!(
213                    ?source_path,
214                    "Failed to remove source after ETL marker found: {e}"
215                );
216            }
217            // Drop any open BufWriter on the just-deleted path so
218            // the engine doesn't continue writing into the orphaned
219            // inode. `evict_writer_for_path` blocking-locks each
220            // candidate slot to be definitive (principle 3:
221            // externally visible truth has changed; in-memory
222            // owner state must sync), so wrap in spawn_blocking
223            // to keep the runtime worker free during the wait.
224            let source = self.source.clone();
225            let path_for_evict = source_path.to_path_buf();
226            let _ = tokio::task::spawn_blocking(move || {
227                source.evict_writer_for_path(&path_for_evict)
228            })
229            .await;
230            if let Err(e) = tokio::fs::remove_file(&marker).await {
231                warn!(?marker, "Failed to remove ETL marker: {e}");
232            }
233            return Ok(());
234        }
235
236        // Java parity (3daedae): wrap the copy+delete in a timeout so a
237        // hung NFS mount doesn't block the ETL loop indefinitely.
238        let timeout = self.move_timeout;
239        let source_path = source_path.to_path_buf();
240        let source_path_disp = source_path.clone();
241        let dest = self.dest.clone();
242        let source = self.source.clone();
243        let source_name = self.source.name().to_string();
244        let dest_name = self.dest.name().to_string();
245
246        tokio::time::timeout(timeout, async move {
247            let mut reader = PbFileReader::open(&source_path)?;
248            let desc = reader.description().clone();
249            let dbr_type = desc.db_type;
250
251            // Copy all samples to destination.
252            while let Some(sample) = reader.next_event()? {
253                dest.append_event(&desc.pv_name, dbr_type, &sample).await?;
254            }
255
256            // Write marker before deleting source — if we crash here, next run
257            // will see the marker and skip re-copy (preventing duplicate data).
258            let marker = source_path.with_extension("pb.etl_done");
259            tokio::fs::write(&marker, b"").await?;
260            tokio::fs::remove_file(&source_path).await?;
261            // Drop any open BufWriter on the just-deleted path so
262            // the engine doesn't continue writing into the orphaned
263            // inode. Blocking lock under spawn_blocking — see the
264            // marker-found branch above for the same rationale.
265            let source_for_evict = source.clone();
266            let path_for_evict = source_path.clone();
267            let _ = tokio::task::spawn_blocking(move || {
268                source_for_evict.evict_writer_for_path(&path_for_evict)
269            })
270            .await;
271            tokio::fs::remove_file(&marker).await.ok();
272            metrics::counter!(
273                "archiver_etl_files_moved_total",
274                "source" => source_name,
275                "dest" => dest_name,
276            )
277            .increment(1);
278            anyhow::Ok(())
279        })
280        .await
281        .map_err(|_| {
282            anyhow::anyhow!("ETL move_file timed out after {timeout:?} for {source_path_disp:?}")
283        })??;
284
285        Ok(())
286    }
287}
288
289/// Group files by PV name. The PV name is derived from the file path structure.
290/// File path pattern: {root}/{pv_prefix}/{pv_suffix}:{partition}.pb
291fn group_files_by_pv(files: &[PathBuf]) -> HashMap<String, Vec<&PathBuf>> {
292    let mut groups: HashMap<String, Vec<&PathBuf>> = HashMap::new();
293    for file in files {
294        let pv_key = extract_pv_key(file);
295        groups.entry(pv_key).or_default().push(file);
296    }
297    groups
298}
299
300/// Extract PV key from a PB file path.
301/// Given path like `.../SIM/Sine:2024_03_15_09.pb`, returns "SIM/Sine".
302fn extract_pv_key(path: &Path) -> String {
303    if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
304        // file_name is like "Sine:2024_03.pb"
305        let stem = file_name.strip_suffix(".pb").unwrap_or(file_name);
306        if let Some(colon_pos) = stem.find(':') {
307            let leaf = &stem[..colon_pos];
308            // Combine with parent directory name for full PV key.
309            if let Some(parent) = path
310                .parent()
311                .and_then(|p| p.file_name())
312                .and_then(|n| n.to_str())
313            {
314                return format!("{parent}/{leaf}");
315            }
316            return leaf.to_string();
317        }
318    }
319    path.to_string_lossy().to_string()
320}
321
322/// Recursively list all .pb files under a directory.
323fn list_pb_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
324    let mut files = Vec::new();
325    if dir.is_dir() {
326        for entry in std::fs::read_dir(dir)? {
327            let entry = entry?;
328            let path = entry.path();
329            if path.is_dir() {
330                files.extend(list_pb_files(&path)?);
331            } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
332                files.push(path);
333            }
334        }
335    }
336    Ok(files)
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_extract_pv_key() {
345        let path = PathBuf::from("/data/sts/SIM/Sine:2024_03_15_09.pb");
346        assert_eq!(extract_pv_key(&path), "SIM/Sine");
347    }
348
349    #[test]
350    fn test_group_files_by_pv() {
351        let files = vec![
352            PathBuf::from("/data/SIM/Sine:2024_03_01.pb"),
353            PathBuf::from("/data/SIM/Sine:2024_03_02.pb"),
354            PathBuf::from("/data/SIM/Cosine:2024_03_01.pb"),
355        ];
356        let grouped = group_files_by_pv(&files);
357        assert_eq!(grouped.len(), 2);
358        assert_eq!(grouped["SIM/Sine"].len(), 2);
359        assert_eq!(grouped["SIM/Cosine"].len(), 1);
360    }
361}