floe_core/io/write/strategy/
mod.rs1use std::path::Path;
2
3use crate::io::storage::Target;
4use crate::{config, io, ConfigError, FloeResult};
5
6use super::parts;
7
8mod append;
9pub(crate) mod merge;
10mod overwrite;
11
12#[derive(Debug, Clone, Copy)]
13pub enum PartScope {
14 Accepted { format: &'static str },
15 Rejected { format: &'static str },
16}
17
18#[derive(Debug, Clone, Copy)]
19pub struct PartSpec {
20 pub extension: &'static str,
21 pub scope: PartScope,
22}
23
24pub struct WriteContext<'a> {
25 pub target: &'a Target,
26 pub cloud: &'a mut io::storage::CloudClient,
27 pub resolver: &'a config::StorageResolver,
28 pub entity: &'a config::EntityConfig,
29}
30
31pub trait ModeStrategy {
32 fn mode(&self) -> config::WriteMode;
33 fn part_allocator(
34 &self,
35 ctx: &mut WriteContext<'_>,
36 spec: PartSpec,
37 ) -> FloeResult<parts::PartNameAllocator>;
38}
39
40#[derive(Debug, Clone)]
41enum CloudProvider {
42 S3,
43 Gcs { bucket: String },
44 Adls { container: String, account: String },
45}
46
47pub fn strategy_for(mode: config::WriteMode) -> &'static dyn ModeStrategy {
48 match mode {
49 config::WriteMode::Overwrite => &overwrite::OVERWRITE_STRATEGY,
50 config::WriteMode::Append => &append::APPEND_STRATEGY,
51 config::WriteMode::MergeScd1 => &append::APPEND_STRATEGY,
53 config::WriteMode::MergeScd2 => &append::APPEND_STRATEGY,
54 }
55}
56
57pub fn ensure_mode_supported(mode: config::WriteMode) -> FloeResult<()> {
58 match mode {
59 config::WriteMode::Overwrite => Ok(()),
60 config::WriteMode::Append => Ok(()),
61 config::WriteMode::MergeScd1 => Ok(()),
62 config::WriteMode::MergeScd2 => Ok(()),
63 }
64}
65
66pub fn accepted_parquet_spec() -> PartSpec {
67 PartSpec {
68 extension: "parquet",
69 scope: PartScope::Accepted { format: "parquet" },
70 }
71}
72
73pub fn rejected_csv_spec() -> PartSpec {
74 PartSpec {
75 extension: "csv",
76 scope: PartScope::Rejected { format: "csv" },
77 }
78}
79
80pub fn append_part_allocator(
81 _ctx: &mut WriteContext<'_>,
82 spec: PartSpec,
83) -> FloeResult<parts::PartNameAllocator> {
84 Ok(parts::PartNameAllocator::unique(spec.extension))
85}
86
87pub fn overwrite_part_allocator(
88 ctx: &mut WriteContext<'_>,
89 spec: PartSpec,
90) -> FloeResult<parts::PartNameAllocator> {
91 match ctx.target {
92 Target::Local { base_path, .. } => {
93 let base_path = Path::new(base_path);
94 let _ = parts::clear_local_part_files(base_path, spec.extension)?;
95 Ok(parts::PartNameAllocator::from_next_index(0, spec.extension))
96 }
97 Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
98 clear_cloud_parts(ctx, spec)?;
99 Ok(parts::PartNameAllocator::from_next_index(0, spec.extension))
100 }
101 }
102}
103
104fn clear_cloud_parts(ctx: &mut WriteContext<'_>, spec: PartSpec) -> FloeResult<()> {
105 let (list_prefix, objects) = list_part_objects(ctx, spec)?;
106 let client = ctx
107 .cloud
108 .client_for(ctx.resolver, ctx.target.storage(), ctx.entity)?;
109 for object in objects
110 .into_iter()
111 .filter(|obj| obj.key.starts_with(&list_prefix))
112 .filter(|obj| parts::is_part_key(&obj.key, spec.extension))
113 {
114 client.delete_object(&object.uri)?;
115 }
116 Ok(())
117}
118
119pub(crate) fn list_part_objects(
120 ctx: &mut WriteContext<'_>,
121 spec: PartSpec,
122) -> FloeResult<(String, Vec<io::storage::ObjectRef>)> {
123 match ctx.target {
124 Target::S3 {
125 storage, base_key, ..
126 } => {
127 let provider = CloudProvider::S3;
128 let list_prefix = list_prefix(ctx.entity, base_key, &provider, spec)?;
129 let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
130 let objects = client.list(&list_prefix)?;
131 Ok((list_prefix, objects))
132 }
133 Target::Gcs {
134 storage,
135 bucket,
136 base_key,
137 ..
138 } => {
139 let provider = CloudProvider::Gcs {
140 bucket: bucket.clone(),
141 };
142 let list_prefix = list_prefix(ctx.entity, base_key, &provider, spec)?;
143 let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
144 let objects = client.list(&list_prefix)?;
145 Ok((list_prefix, objects))
146 }
147 Target::Adls {
148 storage,
149 container,
150 account,
151 base_path,
152 ..
153 } => {
154 let provider = CloudProvider::Adls {
155 container: container.clone(),
156 account: account.clone(),
157 };
158 let list_prefix = list_prefix(ctx.entity, base_path, &provider, spec)?;
159 let client = ctx.cloud.client_for(ctx.resolver, storage, ctx.entity)?;
160 let objects = client.list(&list_prefix)?;
161 Ok((list_prefix, objects))
162 }
163 Target::Local { .. } => Err(Box::new(ConfigError(
164 "cloud part listing requested for local target".to_string(),
165 ))),
166 }
167}
168
169fn list_prefix(
170 entity: &config::EntityConfig,
171 base_path: &str,
172 provider: &CloudProvider,
173 spec: PartSpec,
174) -> FloeResult<String> {
175 let prefix = base_path.trim_matches('/');
176 if prefix.is_empty() {
177 return Err(Box::new(prefix_error(entity, provider, spec)));
178 }
179 Ok(format!("{prefix}/"))
180}
181
182fn prefix_error(
183 entity: &config::EntityConfig,
184 provider: &CloudProvider,
185 spec: PartSpec,
186) -> ConfigError {
187 match (&spec.scope, provider) {
188 (PartScope::Accepted { format }, CloudProvider::S3) => ConfigError(format!(
189 "entity.name={} sink.accepted.path must not be bucket root for s3 {format} outputs",
190 entity.name
191 )),
192 (PartScope::Accepted { format }, CloudProvider::Gcs { bucket }) => ConfigError(format!(
193 "entity.name={} sink.accepted.path must not be bucket root for gcs {format} outputs (bucket={})",
194 entity.name, bucket
195 )),
196 (PartScope::Accepted { format }, CloudProvider::Adls { container, account }) => {
197 ConfigError(format!(
198 "entity.name={} sink.accepted.path must not be container root for adls {format} outputs (container={}, account={})",
199 entity.name, container, account
200 ))
201 }
202 (PartScope::Rejected { .. }, CloudProvider::S3) => ConfigError(format!(
203 "entity.name={} sink.rejected.path must not be bucket root for s3 outputs",
204 entity.name
205 )),
206 (PartScope::Rejected { .. }, CloudProvider::Gcs { .. }) => ConfigError(format!(
207 "entity.name={} sink.rejected.path must not be bucket root for gcs outputs",
208 entity.name
209 )),
210 (PartScope::Rejected { .. }, CloudProvider::Adls { .. }) => ConfigError(format!(
211 "entity.name={} sink.rejected.path must not be container root for adls outputs",
212 entity.name
213 )),
214 }
215}