batch_mode_batch_reconciliation/
reconcile_unprocessed_batch_triple.rs

1// ---------------- [ File: batch-mode-batch-reconciliation/src/reconcile_unprocessed_batch_triple.rs ]
2crate::ix!();
3
4#[async_trait]
5impl<E> ReconcileUnprocessed<E> for BatchFileTriple
6where E
7: From<BatchReconciliationError> 
8+ From<BatchDownloadError> 
9+ From<BatchErrorProcessingError>
10+ From<BatchMetadataError> 
11+ From<BatchOutputProcessingError>
12+ From<BatchValidationError>
13+ From<FileMoveError>
14+ From<OpenAIClientError> 
15+ From<std::io::Error>
16+ Display
17+ Debug
18+ Send
19+ Sync
20{
21    async fn reconcile_unprocessed(
22        &mut self,
23        client:                 &dyn LanguageModelClientInterface<E>,
24        expected_content_type:  &ExpectedContentType,
25        process_output_file_fn: &BatchWorkflowProcessOutputFileFn,   // our new type alias
26        process_error_file_fn:  &BatchWorkflowProcessErrorFileFn,
27    ) -> Result<(), E>
28    {
29        info!("Attempting to reconcile unprocessed batch triple {:?}", self.index());
30
31        let actions = BatchFileReconciliationRecommendedCourseOfAction::try_from(&*self);
32        if let Err(e) = actions {
33            error!("Error determining actions for batch {:?}: {:?}", self.index(), e);
34            return Ok(());
35        }
36
37        let mut actions = actions.unwrap();
38        info!(
39            "Reconciliation actions for batch triple {:?}: {:#?}",
40            self.index(),
41            actions
42        );
43
44        loop {
45            let steps = actions.steps();
46            let mut hit_error       = false;
47            let mut errors          = vec![];
48            let mut updated_actions = false;
49
50            'steps: for action in steps {
51                debug!("Performing reconciliation step: {:?}", action);
52
53                match self.execute_reconciliation_operation(
54                    client,
55                    action,
56                    expected_content_type,
57                    process_output_file_fn,
58                    process_error_file_fn
59                ).await {
60                    Ok(Some(new_actions)) => {
61                        if actions != new_actions {
62                            actions = new_actions;
63                            updated_actions = true;
64                            debug!("Actions changed; recalculating steps");
65                            break 'steps;
66                        }
67                    },
68                    Ok(None) => {
69                        trace!("No follow-up actions from step {:?}", action);
70                    },
71                    Err(e) => {
72                        hit_error = true;
73                        error!(
74                            "Error applying batch action {:?} to reconcile batch {:?}: {:?}",
75                            action,
76                            self.index(),
77                            e
78                        );
79                        errors.push((action.clone(), e));
80                    }
81                }
82            }
83
84            if updated_actions {
85                // If new actions got returned, we handle them in the next iteration.
86                continue;
87            }
88
89            if !hit_error {
90                info!(
91                    "Successfully reconciled batch triple {:?} with final actions {:#?}",
92                    self.index(),
93                    actions
94                );
95                return Ok(());
96            } else {
97                error!("Failed to reconcile batch triple {:?} due to errors.", self.index());
98                for error in errors {
99                    error!("{:#?}",error);
100                }
101                return Err(BatchReconciliationError::ReconciliationFailed {
102                    index:  self.index().clone(),
103                }.into());
104            }
105        }
106    }
107}