Skip to main content

archiver_core/etl/
executor.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use tokio::time::interval;
7use tracing::{debug, error, info, warn};
8
9// Java parity (3daedae): f.get() without a timeout hung indefinitely on
10// slow NFS. Use 24 h as the default, matching Java's chosen bound.
11const DEFAULT_MOVE_TIMEOUT: Duration = Duration::from_secs(24 * 3600);
12
13use crate::registry::{PvRegistry, PvStatus};
14use crate::storage::plainpb::PlainPbStoragePlugin;
15use crate::storage::plainpb::reader::PbFileReader;
16use crate::storage::traits::{EventStream, StoragePlugin};
17
18/// ETL executor — periodically moves data from source tier to destination tier.
19pub struct EtlExecutor {
20    source: Arc<PlainPbStoragePlugin>,
21    dest: Arc<PlainPbStoragePlugin>,
22    /// How often to run ETL (seconds).
23    period_secs: u64,
24    /// Number of partitions to hold in source before moving.
25    hold: u32,
26    /// Number of partitions to gather (move out) at once.
27    gather: u32,
28    /// Per-file move timeout (Java parity 3daedae).
29    move_timeout: Duration,
30    /// Optional PV registry — when present, paused PVs are skipped
31    /// (Java parity 92db337).
32    pv_registry: Option<Arc<PvRegistry>>,
33}
34
35impl EtlExecutor {
36    pub fn new(
37        source: Arc<PlainPbStoragePlugin>,
38        dest: Arc<PlainPbStoragePlugin>,
39        period_secs: u64,
40        hold: u32,
41        gather: u32,
42    ) -> Self {
43        Self {
44            source,
45            dest,
46            period_secs,
47            hold,
48            gather,
49            move_timeout: DEFAULT_MOVE_TIMEOUT,
50            pv_registry: None,
51        }
52    }
53
54    /// Wire a PV registry so the executor can skip paused PVs in
55    /// `run_once`. Java parity (92db337): without this, PB files for a
56    /// paused PV continue to migrate out of the STS, which surprises
57    /// operators who expect the data to stay accessible there until the
58    /// PV resumes.
59    pub fn with_pv_registry(mut self, registry: Arc<PvRegistry>) -> Self {
60        self.pv_registry = Some(registry);
61        self
62    }
63
64    /// Run the ETL loop. Call this as a spawned task.
65    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
66        let mut tick = interval(Duration::from_secs(self.period_secs));
67        info!(
68            source = self.source.name(),
69            dest = self.dest.name(),
70            "ETL executor started"
71        );
72
73        loop {
74            tokio::select! {
75                _ = tick.tick() => {
76                    if let Err(e) = self.run_once().await {
77                        error!("ETL error: {e}");
78                    }
79                }
80                _ = shutdown.changed() => {
81                    info!("ETL executor shutting down");
82                    break;
83                }
84            }
85        }
86    }
87
88    /// Execute one round of ETL: find old partition files in source, move to dest.
89    /// Groups files by PV name for coherent transfers.
90    async fn run_once(&self) -> anyhow::Result<()> {
91        // Operator-controlled bypass (Java's SKIP_<NAME>_FOR_ETL named flag,
92        // adc5889a). Set during e.g. an OS migration to pause writes into a
93        // particular tier without restarting the appliance.
94        if crate::flags::skip_tier_for_etl(self.dest.name()) {
95            debug!(
96                dest = self.dest.name(),
97                "ETL skipped: SKIP_<DEST>_FOR_ETL flag set"
98            );
99            return Ok(());
100        }
101
102        let source_root = self.source.root_folder();
103        if !source_root.exists() {
104            return Ok(());
105        }
106
107        let mut pb_files = list_pb_files(source_root)?;
108        pb_files.sort();
109
110        if pb_files.len() <= self.hold as usize {
111            debug!(
112                count = pb_files.len(),
113                hold = self.hold,
114                "Not enough partitions to trigger ETL"
115            );
116            return Ok(());
117        }
118
119        let files_to_move = pb_files
120            .len()
121            .saturating_sub(self.hold as usize)
122            .min(self.gather as usize);
123
124        // Group files by PV name for coherent per-PV transfers.
125        let files_subset: Vec<PathBuf> = pb_files.into_iter().take(files_to_move).collect();
126        let grouped = group_files_by_pv(&files_subset);
127
128        // Java parity (92db337): skip files whose owning PV is paused.
129        // Compute the set of paused-PV file keys once per tick instead of
130        // per-file to keep the registry lookup off the hot path.
131        let paused_keys: HashSet<String> = match self.pv_registry.as_ref() {
132            Some(reg) => reg
133                .pvs_by_status(PvStatus::Paused)
134                .map(|recs| {
135                    recs.into_iter()
136                        .map(|r| crate::storage::plainpb::pv_name_to_key(&r.pv_name))
137                        .collect()
138                })
139                .unwrap_or_else(|e| {
140                    warn!("ETL: failed to read paused PVs from registry: {e}");
141                    HashSet::new()
142                }),
143            None => HashSet::new(),
144        };
145
146        for (pv_key, files) in &grouped {
147            if paused_keys.contains(pv_key) {
148                debug!(pv = pv_key, "ETL skipping paused PV");
149                continue;
150            }
151            debug!(pv = pv_key, count = files.len(), "ETL processing PV group");
152            for file in files {
153                info!(?file, dest = self.dest.name(), "ETL moving file");
154                if let Err(e) = self.move_file(file).await {
155                    warn!(?file, "ETL failed to move file: {e}");
156                }
157            }
158        }
159
160        Ok(())
161    }
162
163    pub fn source_name(&self) -> &str {
164        self.source.name()
165    }
166
167    pub fn dest_name(&self) -> &str {
168        self.dest.name()
169    }
170
171    /// Force-move every PB file the source tier currently holds for `pv`
172    /// to the destination tier, ignoring `hold` / `gather` constraints.
173    /// Drives the `consolidateDataForPV` BPL endpoint.
174    ///
175    /// The same crash-safe move_file is reused, so partial failures leave
176    /// the source either fully migrated or untouched.
177    pub async fn consolidate_pv(&self, pv: &str) -> anyhow::Result<u64> {
178        // Flush any buffered writes for the source tier so we move
179        // everything that's been written so far.
180        self.source.flush_writes().await?;
181
182        let pv_files =
183            crate::storage::plainpb::list_pv_pb_files_pub(self.source.root_folder(), pv)?;
184        let total = pv_files.len() as u64;
185        info!(
186            pv,
187            total,
188            source = self.source.name(),
189            dest = self.dest.name(),
190            "Consolidating PV files",
191        );
192        for file in &pv_files {
193            if let Err(e) = self.move_file(file).await {
194                warn!(?file, "consolidate_pv: failed to move file: {e}");
195                return Err(e);
196            }
197        }
198        Ok(total)
199    }
200
201    /// Move a single PB file from source to destination tier.
202    /// Uses copy → verify → marker → delete for crash-safe idempotency.
203    async fn move_file(&self, source_path: &Path) -> anyhow::Result<()> {
204        // Check for a marker from a previous incomplete cleanup (crash after copy).
205        let marker = source_path.with_extension("pb.etl_done");
206        if marker.exists() {
207            info!(
208                ?source_path,
209                "Found ETL marker — previous copy completed, cleaning up"
210            );
211            if let Err(e) = tokio::fs::remove_file(source_path).await {
212                warn!(
213                    ?source_path,
214                    "Failed to remove source after ETL marker found: {e}"
215                );
216            }
217            if let Err(e) = tokio::fs::remove_file(&marker).await {
218                warn!(?marker, "Failed to remove ETL marker: {e}");
219            }
220            return Ok(());
221        }
222
223        // Java parity (3daedae): wrap the copy+delete in a timeout so a
224        // hung NFS mount doesn't block the ETL loop indefinitely.
225        let timeout = self.move_timeout;
226        let source_path = source_path.to_path_buf();
227        let source_path_disp = source_path.clone();
228        let dest = self.dest.clone();
229        let source_name = self.source.name().to_string();
230        let dest_name = self.dest.name().to_string();
231
232        tokio::time::timeout(timeout, async move {
233            let mut reader = PbFileReader::open(&source_path)?;
234            let desc = reader.description().clone();
235            let dbr_type = desc.db_type;
236
237            // Copy all samples to destination.
238            while let Some(sample) = reader.next_event()? {
239                dest.append_event(&desc.pv_name, dbr_type, &sample).await?;
240            }
241
242            // Write marker before deleting source — if we crash here, next run
243            // will see the marker and skip re-copy (preventing duplicate data).
244            let marker = source_path.with_extension("pb.etl_done");
245            tokio::fs::write(&marker, b"").await?;
246            tokio::fs::remove_file(&source_path).await?;
247            tokio::fs::remove_file(&marker).await.ok();
248            metrics::counter!(
249                "archiver_etl_files_moved_total",
250                "source" => source_name,
251                "dest" => dest_name,
252            )
253            .increment(1);
254            anyhow::Ok(())
255        })
256        .await
257        .map_err(|_| {
258            anyhow::anyhow!("ETL move_file timed out after {timeout:?} for {source_path_disp:?}")
259        })??;
260
261        Ok(())
262    }
263}
264
265/// Group files by PV name. The PV name is derived from the file path structure.
266/// File path pattern: {root}/{pv_prefix}/{pv_suffix}:{partition}.pb
267fn group_files_by_pv(files: &[PathBuf]) -> HashMap<String, Vec<&PathBuf>> {
268    let mut groups: HashMap<String, Vec<&PathBuf>> = HashMap::new();
269    for file in files {
270        let pv_key = extract_pv_key(file);
271        groups.entry(pv_key).or_default().push(file);
272    }
273    groups
274}
275
276/// Extract PV key from a PB file path.
277/// Given path like `.../SIM/Sine:2024_03_15_09.pb`, returns "SIM/Sine".
278fn extract_pv_key(path: &Path) -> String {
279    if let Some(file_name) = path.file_name().and_then(|n| n.to_str()) {
280        // file_name is like "Sine:2024_03.pb"
281        let stem = file_name.strip_suffix(".pb").unwrap_or(file_name);
282        if let Some(colon_pos) = stem.find(':') {
283            let leaf = &stem[..colon_pos];
284            // Combine with parent directory name for full PV key.
285            if let Some(parent) = path
286                .parent()
287                .and_then(|p| p.file_name())
288                .and_then(|n| n.to_str())
289            {
290                return format!("{parent}/{leaf}");
291            }
292            return leaf.to_string();
293        }
294    }
295    path.to_string_lossy().to_string()
296}
297
298/// Recursively list all .pb files under a directory.
299fn list_pb_files(dir: &Path) -> anyhow::Result<Vec<PathBuf>> {
300    let mut files = Vec::new();
301    if dir.is_dir() {
302        for entry in std::fs::read_dir(dir)? {
303            let entry = entry?;
304            let path = entry.path();
305            if path.is_dir() {
306                files.extend(list_pb_files(&path)?);
307            } else if path.extension().and_then(|e| e.to_str()) == Some("pb") {
308                files.push(path);
309            }
310        }
311    }
312    Ok(files)
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn test_extract_pv_key() {
321        let path = PathBuf::from("/data/sts/SIM/Sine:2024_03_15_09.pb");
322        assert_eq!(extract_pv_key(&path), "SIM/Sine");
323    }
324
325    #[test]
326    fn test_group_files_by_pv() {
327        let files = vec![
328            PathBuf::from("/data/SIM/Sine:2024_03_01.pb"),
329            PathBuf::from("/data/SIM/Sine:2024_03_02.pb"),
330            PathBuf::from("/data/SIM/Cosine:2024_03_01.pb"),
331        ];
332        let grouped = group_files_by_pv(&files);
333        assert_eq!(grouped.len(), 2);
334        assert_eq!(grouped["SIM/Sine"].len(), 2);
335        assert_eq!(grouped["SIM/Cosine"].len(), 1);
336    }
337}