Skip to main content

ito_core/
backend_import.rs

1//! Local-to-backend import orchestration.
2
3use std::path::{Path, PathBuf};
4
5use ito_common::fs::StdFs;
6use ito_common::paths;
7use ito_domain::backend::{BackendArchiveClient, BackendChangeReader, BackendSyncClient};
8use ito_domain::changes::ChangeLifecycleFilter;
9use ito_domain::discovery;
10use ito_domain::errors::DomainError;
11
12use crate::ArtifactBundle;
13use crate::backend_sync::read_bundle_from_change_dir;
14use crate::errors::{CoreError, CoreResult};
15
16/// Lifecycle state for a locally discovered change import.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ImportLifecycle {
19    /// The change is still active in `.ito/changes/`.
20    Active,
21    /// The change is archived under `.ito/changes/archive/`.
22    Archived,
23}
24
25/// A local change ready to be imported into backend-managed state.
26#[derive(Debug, Clone)]
27pub struct LocalImportChange {
28    /// Canonical change identifier.
29    pub change_id: String,
30    /// Source path on disk.
31    pub source_path: PathBuf,
32    /// Lifecycle state of the source change.
33    pub lifecycle: ImportLifecycle,
34    /// Artifact bundle prepared from local files.
35    pub bundle: ArtifactBundle,
36}
37
38/// Sink result for a single import item.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub enum ImportAction {
41    /// The item was imported successfully.
42    Imported,
43    /// The item would be imported during a dry run.
44    Previewed,
45    /// The item was intentionally skipped.
46    Skipped(String),
47}
48
49/// Final status for one imported item.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum ImportItemStatus {
52    /// The change was imported.
53    Imported,
54    /// The change would be imported during a dry run.
55    Previewed,
56    /// The change was skipped.
57    Skipped,
58    /// The change failed to import.
59    Failed,
60}
61
62/// Outcome for a single processed import item.
63#[derive(Debug, Clone)]
64pub struct ImportItemResult {
65    /// Canonical change identifier.
66    pub change_id: String,
67    /// Lifecycle state of the imported change.
68    pub lifecycle: ImportLifecycle,
69    /// Final status.
70    pub status: ImportItemStatus,
71    /// Additional detail for skipped or failed items.
72    pub message: Option<String>,
73}
74
75/// Summary returned by the import orchestration.
76#[derive(Debug, Clone)]
77pub struct ImportSummary {
78    /// Per-item results in deterministic processing order.
79    pub results: Vec<ImportItemResult>,
80    /// Number of imported items.
81    pub imported: usize,
82    /// Number of previewed items during a dry run.
83    pub previewed: usize,
84    /// Number of skipped items.
85    pub skipped: usize,
86    /// Number of failed items.
87    pub failed: usize,
88}
89
90/// Output port for applying one imported change to backend-managed state.
91pub trait BackendImportSink {
92    /// Preview how a single local change would be handled without mutating backend state.
93    fn preview_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction>;
94
95    /// Import a single local change.
96    fn import_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction>;
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100enum ImportPlan {
101    Skip(String),
102    Import {
103        push_bundle: bool,
104        mark_archived: bool,
105    },
106}
107
108/// Import sink backed by the existing backend read/sync/archive ports.
109pub struct RepositoryBackedImportSink<'a, R, S, A> {
110    change_reader: &'a R,
111    sync_client: &'a S,
112    archive_client: &'a A,
113}
114
115impl<'a, R, S, A> RepositoryBackedImportSink<'a, R, S, A> {
116    /// Create a sink backed by existing backend repository clients.
117    pub fn new(change_reader: &'a R, sync_client: &'a S, archive_client: &'a A) -> Self {
118        Self {
119            change_reader,
120            sync_client,
121            archive_client,
122        }
123    }
124}
125
126impl<R, S, A> BackendImportSink for RepositoryBackedImportSink<'_, R, S, A>
127where
128    R: BackendChangeReader,
129    S: BackendSyncClient,
130    A: BackendArchiveClient,
131{
132    fn preview_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction> {
133        match evaluate_import_plan(self.change_reader, self.sync_client, change)? {
134            ImportPlan::Skip(message) => Ok(ImportAction::Skipped(message)),
135            ImportPlan::Import { .. } => Ok(ImportAction::Previewed),
136        }
137    }
138
139    fn import_change(&self, change: &LocalImportChange) -> CoreResult<ImportAction> {
140        match evaluate_import_plan(self.change_reader, self.sync_client, change)? {
141            ImportPlan::Import {
142                push_bundle,
143                mark_archived,
144            } => {
145                if push_bundle {
146                    let mut bundle = change.bundle.clone();
147                    bundle.revision.clear();
148                    self.sync_client
149                        .push(&change.change_id, &bundle)
150                        .map_err(|err| CoreError::process(err.to_string()))?;
151                }
152
153                if mark_archived {
154                    self.archive_client
155                        .mark_archived(&change.change_id)
156                        .map_err(|err| CoreError::process(err.to_string()))?;
157                }
158
159                Ok(ImportAction::Imported)
160            }
161            ImportPlan::Skip(message) => Ok(ImportAction::Skipped(message)),
162        }
163    }
164}
165
166/// Discover local active and archived changes, then import them through a sink.
167pub fn import_local_changes(
168    sink: &dyn BackendImportSink,
169    ito_path: &Path,
170) -> CoreResult<ImportSummary> {
171    import_local_changes_with_options(sink, ito_path, false)
172}
173
174/// Discover local changes, then import or preview them through a sink.
175pub fn import_local_changes_with_options(
176    sink: &dyn BackendImportSink,
177    ito_path: &Path,
178    dry_run: bool,
179) -> CoreResult<ImportSummary> {
180    let changes = discover_local_import_changes(ito_path)?;
181    let mut results = Vec::with_capacity(changes.len());
182    let mut imported = 0;
183    let mut previewed = 0;
184    let mut skipped = 0;
185    let mut failed = 0;
186
187    for change in changes {
188        let outcome = if dry_run {
189            sink.preview_change(&change)
190        } else {
191            sink.import_change(&change)
192        };
193
194        match outcome {
195            Ok(ImportAction::Imported) => {
196                imported += 1;
197                results.push(ImportItemResult {
198                    change_id: change.change_id,
199                    lifecycle: change.lifecycle,
200                    status: ImportItemStatus::Imported,
201                    message: None,
202                });
203            }
204            Ok(ImportAction::Previewed) => {
205                previewed += 1;
206                results.push(ImportItemResult {
207                    change_id: change.change_id,
208                    lifecycle: change.lifecycle,
209                    status: ImportItemStatus::Previewed,
210                    message: None,
211                });
212            }
213            Ok(ImportAction::Skipped(message)) => {
214                skipped += 1;
215                results.push(ImportItemResult {
216                    change_id: change.change_id,
217                    lifecycle: change.lifecycle,
218                    status: ImportItemStatus::Skipped,
219                    message: Some(message),
220                });
221            }
222            Err(err) => {
223                failed += 1;
224                results.push(ImportItemResult {
225                    change_id: change.change_id,
226                    lifecycle: change.lifecycle,
227                    status: ImportItemStatus::Failed,
228                    message: Some(err.to_string()),
229                });
230            }
231        }
232    }
233
234    Ok(ImportSummary {
235        results,
236        imported,
237        previewed,
238        skipped,
239        failed,
240    })
241}
242
243fn evaluate_import_plan<R, S>(
244    change_reader: &R,
245    sync_client: &S,
246    change: &LocalImportChange,
247) -> CoreResult<ImportPlan>
248where
249    R: BackendChangeReader,
250    S: BackendSyncClient,
251{
252    let active_exists = exists_in_backend(
253        change_reader,
254        &change.change_id,
255        ChangeLifecycleFilter::Active,
256    )?;
257    let archived_exists = exists_in_backend(
258        change_reader,
259        &change.change_id,
260        ChangeLifecycleFilter::Archived,
261    )?;
262
263    match change.lifecycle {
264        ImportLifecycle::Active => {
265            if archived_exists {
266                return Err(CoreError::validation(format!(
267                    "backend already contains archived change '{}' while local copy is active",
268                    change.change_id
269                )));
270            }
271            if !active_exists {
272                return Ok(ImportPlan::Import {
273                    push_bundle: true,
274                    mark_archived: false,
275                });
276            }
277
278            if remote_bundle_matches(sync_client, change)? {
279                return Ok(ImportPlan::Skip("already imported".to_string()));
280            }
281
282            Ok(ImportPlan::Import {
283                push_bundle: true,
284                mark_archived: false,
285            })
286        }
287        ImportLifecycle::Archived => {
288            if archived_exists {
289                return Ok(ImportPlan::Skip("already archived".to_string()));
290            }
291
292            if !active_exists {
293                return Ok(ImportPlan::Import {
294                    push_bundle: true,
295                    mark_archived: true,
296                });
297            }
298
299            let push_bundle = !remote_bundle_matches(sync_client, change)?;
300            Ok(ImportPlan::Import {
301                push_bundle,
302                mark_archived: true,
303            })
304        }
305    }
306}
307
308fn exists_in_backend<R>(
309    change_reader: &R,
310    change_id: &str,
311    filter: ChangeLifecycleFilter,
312) -> CoreResult<bool>
313where
314    R: BackendChangeReader,
315{
316    match change_reader.get_change(change_id, filter) {
317        Ok(_) => Ok(true),
318        Err(DomainError::NotFound { .. }) => Ok(false),
319        Err(err) => Err(CoreError::from(err)),
320    }
321}
322
323fn remote_bundle_matches<S>(sync_client: &S, change: &LocalImportChange) -> CoreResult<bool>
324where
325    S: BackendSyncClient,
326{
327    let remote = sync_client
328        .pull(&change.change_id)
329        .map_err(|err| CoreError::process(err.to_string()))?;
330    Ok(artifact_bundles_match(&remote, &change.bundle))
331}
332
333fn artifact_bundles_match(left: &ArtifactBundle, right: &ArtifactBundle) -> bool {
334    left.change_id == right.change_id
335        && left.proposal == right.proposal
336        && left.design == right.design
337        && left.tasks == right.tasks
338        && left.specs == right.specs
339}
340
341fn discover_local_import_changes(ito_path: &Path) -> CoreResult<Vec<LocalImportChange>> {
342    let fs = StdFs;
343    let mut changes = Vec::new();
344
345    for change_id in discovery::list_change_dir_names(&fs, ito_path)? {
346        let path = paths::change_dir(ito_path, &change_id);
347        let bundle = read_bundle_from_change_dir(&path, &change_id)?;
348        changes.push(LocalImportChange {
349            change_id,
350            source_path: path,
351            lifecycle: ImportLifecycle::Active,
352            bundle,
353        });
354    }
355
356    let archive_dir = paths::changes_archive_dir(ito_path);
357    for archived_name in discovery::list_dir_names(&fs, &archive_dir)? {
358        let canonical_change_id = match canonical_archived_change_id(&archived_name) {
359            Ok(change_id) => change_id,
360            Err(err) => {
361                tracing::warn!(
362                    archived_name = %archived_name,
363                    error = %err,
364                    "skipping unrecognized archived change directory during backend import"
365                );
366                continue;
367            }
368        };
369        let path = archive_dir.join(&archived_name);
370        let bundle = read_bundle_from_change_dir(&path, &canonical_change_id)?;
371        changes.push(LocalImportChange {
372            change_id: canonical_change_id,
373            source_path: path,
374            lifecycle: ImportLifecycle::Archived,
375            bundle,
376        });
377    }
378
379    changes.sort_by(|left, right| left.change_id.cmp(&right.change_id));
380    Ok(changes)
381}
382
383fn canonical_archived_change_id(archived_name: &str) -> CoreResult<String> {
384    let parts: Vec<&str> = archived_name.splitn(4, '-').collect();
385    if parts.len() != 4
386        || parts[0].len() != 4
387        || parts[1].len() != 2
388        || parts[2].len() != 2
389        || parts[3].is_empty()
390        || !parts[0].chars().all(|ch| ch.is_ascii_digit())
391        || !parts[1].chars().all(|ch| ch.is_ascii_digit())
392        || !parts[2].chars().all(|ch| ch.is_ascii_digit())
393    {
394        return Err(CoreError::validation(format!(
395            "Archived change directory has unexpected format: {archived_name}"
396        )));
397    }
398    Ok(parts[3].to_string())
399}