Skip to main content

floe_core/io/write/strategy/
mod.rs

1use std::path::Path;
2
3use crate::io::storage::Target;
4use crate::{config, io, ConfigError, FloeResult};
5
6use super::parts;
7
8mod append;
9pub(crate) mod merge;
10mod overwrite;
11
12#[derive(Debug, Clone, Copy)]
13pub enum PartScope {
14    Accepted { format: &'static str },
15    Rejected { format: &'static str },
16}
17
18#[derive(Debug, Clone, Copy)]
19pub struct PartSpec {
20    pub extension: &'static str,
21    pub scope: PartScope,
22}
23
24pub struct WriteContext<'a> {
25    pub target: &'a Target,
26    pub cloud: &'a mut io::storage::CloudClient,
27    pub resolver: &'a config::StorageResolver,
28    pub entity: &'a config::EntityConfig,
29}
30
31pub trait ModeStrategy {
32    fn mode(&self) -> config::WriteMode;
33    fn part_allocator(
34        &self,
35        ctx: &mut WriteContext<'_>,
36        spec: PartSpec,
37    ) -> FloeResult<parts::PartNameAllocator>;
38}
39
40#[derive(Debug, Clone)]
41enum CloudProvider {
42    S3,
43    Gcs { bucket: String },
44    Adls { container: String, account: String },
45}
46
47pub fn strategy_for(mode: config::WriteMode) -> &'static dyn ModeStrategy {
48    match mode {
49        config::WriteMode::Overwrite => &overwrite::OVERWRITE_STRATEGY,
50        config::WriteMode::Append => &append::APPEND_STRATEGY,
51        // merge_scd* are accepted-writer specific (Delta only); rejected row outputs keep append semantics.
52        config::WriteMode::MergeScd1 => &append::APPEND_STRATEGY,
53        config::WriteMode::MergeScd2 => &append::APPEND_STRATEGY,
54    }
55}
56
57pub fn ensure_mode_supported(mode: config::WriteMode) -> FloeResult<()> {
58    match mode {
59        config::WriteMode::Overwrite => Ok(()),
60        config::WriteMode::Append => Ok(()),
61        config::WriteMode::MergeScd1 => Ok(()),
62        config::WriteMode::MergeScd2 => Ok(()),
63    }
64}
65
66pub fn accepted_parquet_spec() -> PartSpec {
67    PartSpec {
68        extension: "parquet",
69        scope: PartScope::Accepted { format: "parquet" },
70    }
71}
72
73pub fn rejected_csv_spec() -> PartSpec {
74    PartSpec {
75        extension: "csv",
76        scope: PartScope::Rejected { format: "csv" },
77    }
78}
79
80pub fn append_part_allocator(
81    _ctx: &mut WriteContext<'_>,
82    spec: PartSpec,
83) -> FloeResult<parts::PartNameAllocator> {
84    Ok(parts::PartNameAllocator::unique(spec.extension))
85}
86
87pub fn overwrite_part_allocator(
88    ctx: &mut WriteContext<'_>,
89    spec: PartSpec,
90) -> FloeResult<parts::PartNameAllocator> {
91    match ctx.target {
92        Target::Local { base_path, .. } => {
93            let base_path = Path::new(base_path);
94            let _ = parts::clear_local_part_files(base_path, spec.extension)?;
95            Ok(parts::PartNameAllocator::from_next_index(0, spec.extension))
96        }
97        Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
98            clear_cloud_parts(ctx, spec)?;
99            Ok(parts::PartNameAllocator::from_next_index(0, spec.extension))
100        }
101    }
102}
103
104fn clear_cloud_parts(ctx: &mut WriteContext<'_>, spec: PartSpec) -> FloeResult<()> {
105    let (list_prefix, objects) = list_part_objects(ctx, spec)?;
106    let client = ctx
107        .cloud
108        .client_for(ctx.resolver, ctx.target.storage(), ctx.entity)?;
109    for object in objects
110        .into_iter()
111        .filter(|obj| obj.key.starts_with(&list_prefix))
112        .filter(|obj| parts::is_part_key(&obj.key, spec.extension))
113    {
114        client.delete_object(&object.uri)?;
115    }
116    Ok(())
117}
118
119pub(crate) fn list_part_objects(
120    ctx: &mut WriteContext<'_>,
121    spec: PartSpec,
122) -> FloeResult<(String, Vec<io::storage::ObjectRef>)> {
123    match ctx.target {
124        Target::S3 {
125            storage, base_key, ..
126        } => {
127            let provider = CloudProvider::S3;
128            let list_prefix = list_prefix(ctx.entity, base_key, &provider, spec)?;
129            let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
130            let objects = client.list(&list_prefix)?;
131            Ok((list_prefix, objects))
132        }
133        Target::Gcs {
134            storage,
135            bucket,
136            base_key,
137            ..
138        } => {
139            let provider = CloudProvider::Gcs {
140                bucket: bucket.clone(),
141            };
142            let list_prefix = list_prefix(ctx.entity, base_key, &provider, spec)?;
143            let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
144            let objects = client.list(&list_prefix)?;
145            Ok((list_prefix, objects))
146        }
147        Target::Adls {
148            storage,
149            container,
150            account,
151            base_path,
152            ..
153        } => {
154            let provider = CloudProvider::Adls {
155                container: container.clone(),
156                account: account.clone(),
157            };
158            let list_prefix = list_prefix(ctx.entity, base_path, &provider, spec)?;
159            let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
160            let objects = client.list(&list_prefix)?;
161            Ok((list_prefix, objects))
162        }
163        Target::Local { .. } => Err(Box::new(ConfigError(
164            "cloud part listing requested for local target".to_string(),
165        ))),
166    }
167}
168
169fn list_prefix(
170    entity: &config::EntityConfig,
171    base_path: &str,
172    provider: &CloudProvider,
173    spec: PartSpec,
174) -> FloeResult<String> {
175    let prefix = base_path.trim_matches('/');
176    if prefix.is_empty() {
177        return Err(Box::new(prefix_error(entity, provider, spec)));
178    }
179    Ok(format!("{prefix}/"))
180}
181
182fn prefix_error(
183    entity: &config::EntityConfig,
184    provider: &CloudProvider,
185    spec: PartSpec,
186) -> ConfigError {
187    match (&spec.scope, provider) {
188        (PartScope::Accepted { format }, CloudProvider::S3) => ConfigError(format!(
189            "entity.name={} sink.accepted.path must not be bucket root for s3 {format} outputs",
190            entity.name
191        )),
192        (PartScope::Accepted { format }, CloudProvider::Gcs { bucket }) => ConfigError(format!(
193            "entity.name={} sink.accepted.path must not be bucket root for gcs {format} outputs (bucket={})",
194            entity.name, bucket
195        )),
196        (PartScope::Accepted { format }, CloudProvider::Adls { container, account }) => {
197            ConfigError(format!(
198                "entity.name={} sink.accepted.path must not be container root for adls {format} outputs (container={}, account={})",
199                entity.name, container, account
200            ))
201        }
202        (PartScope::Rejected { .. }, CloudProvider::S3) => ConfigError(format!(
203            "entity.name={} sink.rejected.path must not be bucket root for s3 outputs",
204            entity.name
205        )),
206        (PartScope::Rejected { .. }, CloudProvider::Gcs { .. }) => ConfigError(format!(
207            "entity.name={} sink.rejected.path must not be bucket root for gcs outputs",
208            entity.name
209        )),
210        (PartScope::Rejected { .. }, CloudProvider::Adls { .. }) => ConfigError(format!(
211            "entity.name={} sink.rejected.path must not be container root for adls outputs",
212            entity.name
213        )),
214    }
215}