1use std::path::{Path, PathBuf};
4
5use ito_common::fs::StdFs;
6use ito_common::paths;
7use ito_domain::backend::{BackendArchiveClient, BackendChangeReader, BackendSyncClient};
8use ito_domain::changes::ChangeLifecycleFilter;
9use ito_domain::discovery;
10use ito_domain::errors::DomainError;
11
12use crate::ArtifactBundle;
13use crate::backend_sync::read_bundle_from_change_dir;
14use crate::errors::{CoreError, CoreResult};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ImportLifecycle {
19 Active,
21 Archived,
23}
24
25#[derive(Debug, Clone)]
27pub struct LocalImportChange {
28 pub change_id: String,
30 pub source_path: PathBuf,
32 pub lifecycle: ImportLifecycle,
34 pub bundle: ArtifactBundle,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ImportAction {
41 Imported,
43 Previewed,
45 Skipped(String),
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ImportItemStatus {
52 Imported,
54 Previewed,
56 Skipped,
58 Failed,
60}
61
62#[derive(Debug, Clone)]
64pub struct ImportItemResult {
65 pub change_id: String,
67 pub lifecycle: ImportLifecycle,
69 pub status: ImportItemStatus,
71 pub message: Option<String>,
73}
74
75#[derive(Debug, Clone)]
77pub struct ImportSummary {
78 pub results: Vec<ImportItemResult>,
80 pub imported: usize,
82 pub previewed: usize,
84 pub skipped: usize,
86 pub failed: usize,
88}
89
90pub trait BackendImportSink {
92 fn preview_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction>;
94
95 fn import_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction>;
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100enum ImportPlan {
101 Skip(String),
102 Import {
103 push_bundle: bool,
104 mark_archived: bool,
105 },
106}
107
108pub struct RepositoryBackedImportSink<'a, R, S, A> {
110 change_reader: &'a R,
111 sync_client: &'a S,
112 archive_client: &'a A,
113}
114
115impl<'a, R, S, A> RepositoryBackedImportSink<'a, R, S, A> {
116 pub fn new(change_reader: &'a R, sync_client: &'a S, archive_client: &'a A) -> Self {
118 Self {
119 change_reader,
120 sync_client,
121 archive_client,
122 }
123 }
124}
125
126impl<R, S, A> BackendImportSink for RepositoryBackedImportSink<'_, R, S, A>
127where
128 R: BackendChangeReader,
129 S: BackendSyncClient,
130 A: BackendArchiveClient,
131{
132 fn preview_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction> {
133 match evaluate_import_plan(self.change_reader, self.sync_client, change)? {
134 ImportPlan::Skip(message) => Ok(ImportAction::Skipped(message)),
135 ImportPlan::Import { .. } => Ok(ImportAction::Previewed),
136 }
137 }
138
139 fn import_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction> {
140 match evaluate_import_plan(self.change_reader, self.sync_client, change)? {
141 ImportPlan::Import {
142 push_bundle,
143 mark_archived,
144 } => {
145 if push_bundle {
146 let mut bundle = change.bundle.clone();
147 bundle.revision.clear();
148 self.sync_client
149 .push(&change.change_id, &bundle)
150 .map_err(|err| CoreError::process(err.to_string()))?;
151 }
152
153 if mark_archived {
154 self.archive_client
155 .mark_archived(&change.change_id)
156 .map_err(|err| CoreError::process(err.to_string()))?;
157 }
158
159 Ok(ImportAction::Imported)
160 }
161 ImportPlan::Skip(message) => Ok(ImportAction::Skipped(message)),
162 }
163 }
164}
165
166pub fn import_local_changes(
168 sink: &dyn BackendImportSink,
169 ito_path: &Path,
170) -> CoreResult<ImportSummary> {
171 import_local_changes_with_options(sink, ito_path, false)
172}
173
174pub fn import_local_changes_with_options(
176 sink: &dyn BackendImportSink,
177 ito_path: &Path,
178 dry_run: bool,
179) -> CoreResult<ImportSummary> {
180 let changes = discover_local_import_changes(ito_path)?;
181 let mut results = Vec::with_capacity(changes.len());
182 let mut imported = 0;
183 let mut previewed = 0;
184 let mut skipped = 0;
185 let mut failed = 0;
186
187 for change in changes {
188 let outcome = if dry_run {
189 sink.preview_change(&change)
190 } else {
191 sink.import_change(&change)
192 };
193
194 match outcome {
195 Ok(ImportAction::Imported) => {
196 imported += 1;
197 results.push(ImportItemResult {
198 change_id: change.change_id,
199 lifecycle: change.lifecycle,
200 status: ImportItemStatus::Imported,
201 message: None,
202 });
203 }
204 Ok(ImportAction::Previewed) => {
205 previewed += 1;
206 results.push(ImportItemResult {
207 change_id: change.change_id,
208 lifecycle: change.lifecycle,
209 status: ImportItemStatus::Previewed,
210 message: None,
211 });
212 }
213 Ok(ImportAction::Skipped(message)) => {
214 skipped += 1;
215 results.push(ImportItemResult {
216 change_id: change.change_id,
217 lifecycle: change.lifecycle,
218 status: ImportItemStatus::Skipped,
219 message: Some(message),
220 });
221 }
222 Err(err) => {
223 failed += 1;
224 results.push(ImportItemResult {
225 change_id: change.change_id,
226 lifecycle: change.lifecycle,
227 status: ImportItemStatus::Failed,
228 message: Some(err.to_string()),
229 });
230 }
231 }
232 }
233
234 Ok(ImportSummary {
235 results,
236 imported,
237 previewed,
238 skipped,
239 failed,
240 })
241}
242
243fn evaluate_import_plan<R, S>(
244 change_reader: &R,
245 sync_client: &S,
246 change: &LocalImportChange,
247) -> CoreResult<ImportPlan>
248where
249 R: BackendChangeReader,
250 S: BackendSyncClient,
251{
252 let active_exists = exists_in_backend(
253 change_reader,
254 &change.change_id,
255 ChangeLifecycleFilter::Active,
256 )?;
257 let archived_exists = exists_in_backend(
258 change_reader,
259 &change.change_id,
260 ChangeLifecycleFilter::Archived,
261 )?;
262
263 match change.lifecycle {
264 ImportLifecycle::Active => {
265 if archived_exists {
266 return Err(CoreError::validation(format!(
267 "backend already contains archived change '{}' while local copy is active",
268 change.change_id
269 )));
270 }
271 if !active_exists {
272 return Ok(ImportPlan::Import {
273 push_bundle: true,
274 mark_archived: false,
275 });
276 }
277
278 if remote_bundle_matches(sync_client, change)? {
279 return Ok(ImportPlan::Skip("already imported".to_string()));
280 }
281
282 Ok(ImportPlan::Import {
283 push_bundle: true,
284 mark_archived: false,
285 })
286 }
287 ImportLifecycle::Archived => {
288 if archived_exists {
289 return Ok(ImportPlan::Skip("already archived".to_string()));
290 }
291
292 if !active_exists {
293 return Ok(ImportPlan::Import {
294 push_bundle: true,
295 mark_archived: true,
296 });
297 }
298
299 let push_bundle = !remote_bundle_matches(sync_client, change)?;
300 Ok(ImportPlan::Import {
301 push_bundle,
302 mark_archived: true,
303 })
304 }
305 }
306}
307
308fn exists_in_backend<R>(
309 change_reader: &R,
310 change_id: &str,
311 filter: ChangeLifecycleFilter,
312) -> CoreResult<bool>
313where
314 R: BackendChangeReader,
315{
316 match change_reader.get_change(change_id, filter) {
317 Ok(_) => Ok(true),
318 Err(DomainError::NotFound { .. }) => Ok(false),
319 Err(err) => Err(CoreError::from(err)),
320 }
321}
322
323fn remote_bundle_matches<S>(sync_client: &S, change: &LocalImportChange) -> CoreResult<bool>
324where
325 S: BackendSyncClient,
326{
327 let remote = sync_client
328 .pull(&change.change_id)
329 .map_err(|err| CoreError::process(err.to_string()))?;
330 Ok(artifact_bundles_match(&remote, &change.bundle))
331}
332
333fn artifact_bundles_match(left: &ArtifactBundle, right: &ArtifactBundle) -> bool {
334 left.change_id == right.change_id
335 && left.proposal == right.proposal
336 && left.design == right.design
337 && left.tasks == right.tasks
338 && left.specs == right.specs
339}
340
341fn discover_local_import_changes(ito_path: &Path) -> CoreResult<Vec<LocalImportChange>> {
342 let fs = StdFs;
343 let mut changes = Vec::new();
344
345 for change_id in discovery::list_change_dir_names(&fs, ito_path)? {
346 let path = paths::change_dir(ito_path, &change_id);
347 let bundle = read_bundle_from_change_dir(&path, &change_id)?;
348 changes.push(LocalImportChange {
349 change_id,
350 source_path: path,
351 lifecycle: ImportLifecycle::Active,
352 bundle,
353 });
354 }
355
356 let archive_dir = paths::changes_archive_dir(ito_path);
357 for archived_name in discovery::list_dir_names(&fs, &archive_dir)? {
358 let canonical_change_id = match canonical_archived_change_id(&archived_name) {
359 Ok(change_id) => change_id,
360 Err(err) => {
361 tracing::warn!(
362 archived_name = %archived_name,
363 error = %err,
364 "skipping unrecognized archived change directory during backend import"
365 );
366 continue;
367 }
368 };
369 let path = archive_dir.join(&archived_name);
370 let bundle = read_bundle_from_change_dir(&path, &canonical_change_id)?;
371 changes.push(LocalImportChange {
372 change_id: canonical_change_id,
373 source_path: path,
374 lifecycle: ImportLifecycle::Archived,
375 bundle,
376 });
377 }
378
379 changes.sort_by(|left, right| left.change_id.cmp(&right.change_id));
380 Ok(changes)
381}
382
383fn canonical_archived_change_id(archived_name: &str) -> CoreResult<String> {
384 let parts: Vec<&str> = archived_name.splitn(4, '-').collect();
385 if parts.len() != 4
386 || parts[0].len() != 4
387 || parts[1].len() != 2
388 || parts[2].len() != 2
389 || parts[3].is_empty()
390 || !parts[0].chars().all(|ch| ch.is_ascii_digit())
391 || !parts[1].chars().all(|ch| ch.is_ascii_digit())
392 || !parts[2].chars().all(|ch| ch.is_ascii_digit())
393 {
394 return Err(CoreError::validation(format!(
395 "Archived change directory has unexpected format: {archived_name}"
396 )));
397 }
398 Ok(parts[3].to_string())
399}