Skip to main content

archiver_core/etl/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::time::interval;
7use tracing::{debug, error, info, warn};
8
9// Java parity (3daedae): f.get() without a timeout hung indefinitely on
10// slow NFS. Use 24 h as the default, matching Java's chosen bound.
11const DEFAULT_MOVE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
12
13use crate::registry::{PvRegistry, PvStatus};
14use crate::storage::plainpb::PlainPbStoragePlugin;
15use crate::storage::plainpb::reader::PbFileReader;
16use crate::storage::traits::{EventStream, StoragePlugin};
17
18/// ETL executor — periodically moves data from source tier to destination tier.
19pub struct EtlExecutor {
20    source: Arc<PlainPbStoragePlugin>,
21    dest: Arc<PlainPbStoragePlugin>,
22    /// How often to run ETL (seconds).
23    period_secs: u64,
24    /// Number of partitions to hold in source before moving.
25    hold: u32,
26    /// Number of partitions to gather (move out) at once.
27    gather: u32,
28    /// Per-file move timeout (Java parity 3daedae).
29    move_timeout: Duration,
30    /// Optional PV registry — when present, paused PVs are skipped
31    /// (Java parity 92db337).
32    pv_registry: Option<Arc<PvRegistry>>,
33}
34
35impl EtlExecutor {
36    pub fn new(
37        source: Arc<PlainPbStoragePlugin>,
38        dest: Arc<PlainPbStoragePlugin>,
39        period_secs: u64,
40        hold: u32,
41        gather: u32,
42    ) -> Self {
43        Self {
44            source,
45            dest,
46            period_secs,
47            hold,
48            gather,
49            move_timeout: DEFAULT_MOVE_TIMEOUT,
50            pv_registry: None,
51        }
52    }
53
54    /// Wire a PV registry so the executor can skip paused PVs in
55    /// `run_once`. Java parity (92db337): without this, PB files for a
56    /// paused PV continue to migrate out of the STS, which surprises
57    /// operators who expect the data to stay accessible there until the
58    /// PV resumes.
59    pub fn with_pv_registry(mut self, registry: Arc<PvRegistry>) -> Self {
60        self.pv_registry = Some(registry);
61        self
62    }
63
64    /// Run the ETL loop. Call this as a spawned task.
65    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
66        let mut tick = interval(Duration::from_secs(self.period_secs));
67        info!(
68            source = self.source.name(),
69            dest = self.dest.name(),
70            "ETL executor started"
71        );
72
73        loop {
74            tokio::select! {
75                _ = tick.tick() => {
76                    if let Err(e) = self.run_once().await {
77                        error!("ETL error: {e}");
78                    }
79                }
80                _ = shutdown.changed() => {
81                    info!("ETL executor shutting down");
82                    break;
83                }
84            }
85        }
86    }
87
88    /// Execute one round of ETL: find old partition files in source, move to dest.
89    /// Groups files by PV name for coherent transfers.
90    async fn run_once(&self) -> anyhow::Result<()> {
91        // Operator-controlled bypass (Java's SKIP_<NAME>_FOR_ETL named flag,
92        // adc5889a). Set during e.g. an OS migration to pause writes into a
93        // particular tier without restarting the appliance.
94        if crate::flags::skip_tier_for_etl(self.dest.name()) {
95            debug!(
96                dest = self.dest.name(),
97                "ETL skipped: SKIP_<DEST>_FOR_ETL flag set"
98            );
99            return Ok(());
100        }
101
102        let source_root = self.source.root_folder();
103        if !source_root.exists() {
104            return Ok(());
105        }
106
107        let mut pb_files = list_pb_files(source_root)?;
108        pb_files.sort();
109
110        if pb_files.len() <= self.hold as usize {
111            debug!(
112                count = pb_files.len(),
113                hold = self.hold,
114                "Not enough partitions to trigger ETL"
115            );
116            return Ok(());
117        }
118
119        let files_to_move = pb_files
120            .len()
121            .saturating_sub(self.hold as usize)
122            .min(self.gather as usize);
123
124        // Group files by PV name for coherent per-PV transfers.
125        let files_subset: Vec<PathBuf> = pb_files.into_iter().take(files_to_move).collect();
126        let grouped = group_files_by_pv(&files_subset);
127
128        // Java parity (92db337): skip files whose owning PV is paused.
129        // Compute the set of paused-PV file keys once per tick instead of
130        // per-file to keep the registry lookup off the hot path.
131        let paused_keys: HashSet<String> = match self.pv_registry.as_ref() {
132            Some(reg) => reg
133                .pvs_by_status(PvStatus::Paused)
134                .map(|recs| {
135                    recs.into_iter()
136                        .map(|r| crate::storage::plainpb::pv_name_to_key(&r.pv_name))
137                        .collect()
138                })
139                .unwrap_or_else(|e| {
140                    warn!("ETL: failed to read paused PVs from registry: {e}");
141                    HashSet::new()
142                }),
143            None => HashSet::new(),
144        };
145
146        for (pv_key, files) in &grouped {
147            if paused_keys.contains(pv_key) {
148                debug!(pv = pv_key, "ETL skipping paused PV");
149                continue;
150            }
151            debug!(pv = pv_key, count = files.len(), "ETL processing PV group");
152            for file in files {
153                info!(?file, dest = self.dest.name(), "ETL moving file");
154                if let Err(e) = self.move_file(file).await {
155                    warn!(?file, "ETL failed to move file: {e}");
156                }
157            }
158        }
159
160        Ok(())
161    }
162
163    pub fn source_name(&self) -> &str {
164        self.source.name()
165    }
166
167    pub fn dest_name(&self) -> &str {
168        self.dest.name()
169    }
170
171    /// Force-move every PB file the source tier currently holds for `pv`
172    /// to the destination tier, ignoring `hold` / `gather` constraints.
173    /// Drives the `consolidateDataForPV` BPL endpoint.
174    ///
175    /// The same crash-safe move_file is reused, so partial failures leave
176    /// the source either fully migrated or untouched.
177    pub async fn consolidate_pv(&self, pv: &str) -> anyhow::Result<u64> {
178        // Flush any buffered writes for the source tier so we move
179        // everything that's been written so far.
180        self.source.flush_writes().await?;
181
182        let pv_files =
183            crate::storage::plainpb::list_pv_pb_files_pub(self.source.root_folder(), pv)?;
184        let total = pv_files.len() as u64;
185        info!(
186            pv,
187            total,
188            source = self.source.name(),
189            dest = self.dest.name(),
190            "Consolidating PV files",
191        );
192        for file in &pv_files {
193            if let Err(e) = self.move_file(file).await {
194                warn!(?file, "consolidate_pv: failed to move file: {e}");
195                return Err(e);
196            }
197        }
198        Ok(total)
199    }
200
201    /// Move a single PB file from source to destination tier.
202    /// Uses copy → verify → marker → delete for crash-safe idempotency.
203    async fn move_file(&self, source_path: &Path) -> anyhow::Result<()> {
204        // Check for a marker from a previous incomplete cleanup (crash after copy).
205        let marker = source_path.with_extension("pb.etl_done");
206        if marker.exists() {
207            info!(
208                ?source_path,
209                "Found ETL marker — previous copy completed, cleaning up"
210            );
211            if let Err(e) = tokio::fs::remove_file(source_path).await {
212                warn!(
213                    ?source_path,
214                    "Failed to remove source after ETL marker found: {e}"
215                );
216            }
217            // Drop any open BufWriter on the just-deleted path so
218            // the engine doesn't continue writing into the orphaned
219            // inode. `evict_writer_for_path` blocking-locks each
220            // candidate slot to be definitive (principle 3:
221            // externally visible truth has changed; in-memory
222            // owner state must sync), so wrap in spawn_blocking
223            // to keep the runtime worker free during the wait.
224            let source = self.source.clone();
225            let path_for_evict = source_path.to_path_buf();
226            let _ =
227                tokio::task::spawn_blocking(move || source.evict_writer_for_path(&path_for_evict))
228                    .await;
229            if let Err(e) = tokio::fs::remove_file(&marker).await {
230                warn!(?marker, "Failed to remove ETL marker: {e}");
231            }
232            return Ok(());
233        }
234
235        // Java parity (3daedae): wrap the copy+delete in a timeout so a
236        // hung NFS mount doesn't block the ETL loop indefinitely.
237        let timeout = self.move_timeout;
238        let source_path = source_path.to_path_buf();
239        let source_path_disp = source_path.clone();
240        let dest = self.dest.clone();
241        let source = self.source.clone();
242        let source_name = self.source.name().to_string();
243        let dest_name = self.dest.name().to_string();
244
245        tokio::time::timeout(timeout, async move {
246            let mut reader = PbFileReader::open(&source_path)?;
247            let desc = reader.description().clone();
248            let dbr_type = desc.db_type;
249
250            // Copy all samples to destination.
251            while let Some(sample) = reader.next_event()? {
252                dest.append_event(&desc.pv_name, dbr_type, &sample).await?;
253            }
254
255            // Write marker before deleting source — if we crash here, next run
256            // will see the marker and skip re-copy (preventing duplicate data).
257            let marker = source_path.with_extension("pb.etl_done");
258            tokio::fs::write(&marker, b"").await?;
259            tokio::fs::remove_file(&source_path).await?;
260            // Drop any open BufWriter on the just-deleted path so
261            // the engine doesn't continue writing into the orphaned
262            // inode. Blocking lock under spawn_blocking — see the
263            // marker-found branch above for the same rationale.
264            let source_for_evict = source.clone();
265            let path_for_evict = source_path.clone();
266            let _ = tokio::task::spawn_blocking(move || {
267                source_for_evict.evict_writer_for_path(&path_for_evict)
268            })
269            .await;
270            tokio::fs::remove_file(&marker).await.ok();
271            metrics::counter!(
272                "archiver_etl_files_moved_total",
273                "source" => source_name,
274                "dest" => dest_name,
275            )
276            .increment(1);
277            anyhow::Ok(())
278        })
279        .await
280        .map_err(|_| {
281            anyhow::anyhow!("ETL move_file timed out after {timeout:?} for {source_path_disp:?}")
282        })??;
283
284        Ok(())
285    }
286}
287
288/// Group files by PV name. The PV name is derived from the file path structure.
289/// File path pattern: {root}/{pv_prefix}/{pv_suffix}:{partition}.pb
290fn group_files_by_pv(files: &[PathBuf]) -> HashMap<String, Vec<&PathBuf>> {
291    let mut groups: HashMap<String, Vec<&PathBuf>> = HashMap::new();
292    for file in files {
293        let pv_key = extract_pv_key(file);
294        groups.entry(pv_key).or_default().push(file);
295    }
296    groups
297}
298
299/// Extract PV key from a PB file path.
300/// Given path like `.../SIM/Sine:2024_03_15_09.pb`, returns "SIM/Sine".
301fn extract_pv_key(path: &Path) -> String {
302    if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
303        // file_name is like "Sine:2024_03.pb"
304        let stem = file_name.strip_suffix(".pb").unwrap_or(file_name);
305        if let Some(colon_pos) = stem.find(':') {
306            let leaf = &stem[..colon_pos];
307            // Combine with parent directory name for full PV key.
308            if let Some(parent) = path
309                .parent()
310                .and_then(|p| p.file_name())
311                .and_then(|n| n.to_str())
312            {
313                return format!("{parent}/{leaf}");
314            }
315            return leaf.to_string();
316        }
317    }
318    path.to_string_lossy().to_string()
319}
320
321/// Recursively list all .pb files under a directory.
322fn list_pb_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
323    let mut files = Vec::new();
324    if dir.is_dir() {
325        for entry in std::fs::read_dir(dir)? {
326            let entry = entry?;
327            let path = entry.path();
328            if path.is_dir() {
329                files.extend(list_pb_files(&path)?);
330            } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
331                files.push(path);
332            }
333        }
334    }
335    Ok(files)
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_extract_pv_key() {
344        let path = PathBuf::from("/data/sts/SIM/Sine:2024_03_15_09.pb");
345        assert_eq!(extract_pv_key(&path), "SIM/Sine");
346    }
347
348    #[test]
349    fn test_group_files_by_pv() {
350        let files = vec![
351            PathBuf::from("/data/SIM/Sine:2024_03_01.pb"),
352            PathBuf::from("/data/SIM/Sine:2024_03_02.pb"),
353            PathBuf::from("/data/SIM/Cosine:2024_03_01.pb"),
354        ];
355        let grouped = group_files_by_pv(&files);
356        assert_eq!(grouped.len(), 2);
357        assert_eq!(grouped["SIM/Sine"].len(), 2);
358        assert_eq!(grouped["SIM/Cosine"].len(), 1);
359    }
360}