batch_mode_batch_reconciliation/
reconcile_unprocessed_batch_triple.rs1crate::ix!();
3
4#[async_trait]
5impl<E> ReconcileUnprocessed<E> for BatchFileTriple
6where E
7: From<BatchReconciliationError>
8+ From<BatchDownloadError>
9+ From<BatchErrorProcessingError>
10+ From<BatchMetadataError>
11+ From<BatchOutputProcessingError>
12+ From<BatchValidationError>
13+ From<FileMoveError>
14+ From<OpenAIClientError>
15+ From<std::io::Error>
16+ Display
17+ Debug
18+ Send
19+ Sync
20{
21 async fn reconcile_unprocessed(
22 &mut self,
23 client: &dyn LanguageModelClientInterface<E>,
24 expected_content_type: &ExpectedContentType,
25 process_output_file_fn: &BatchWorkflowProcessOutputFileFn, process_error_file_fn: &BatchWorkflowProcessErrorFileFn,
27 ) -> Result<(), E>
28 {
29 info!("Attempting to reconcile unprocessed batch triple {:?}", self.index());
30
31 let actions = BatchFileReconciliationRecommendedCourseOfAction::try_from(&*self);
32 if let Err(e) = actions {
33 error!("Error determining actions for batch {:?}: {:?}", self.index(), e);
34 return Ok(());
35 }
36
37 let mut actions = actions.unwrap();
38 info!(
39 "Reconciliation actions for batch triple {:?}: {:#?}",
40 self.index(),
41 actions
42 );
43
44 loop {
45 let steps = actions.steps();
46 let mut hit_error = false;
47 let mut errors = vec![];
48 let mut updated_actions = false;
49
50 'steps: for action in steps {
51 debug!("Performing reconciliation step: {:?}", action);
52
53 match self.execute_reconciliation_operation(
54 client,
55 action,
56 expected_content_type,
57 process_output_file_fn,
58 process_error_file_fn
59 ).await {
60 Ok(Some(new_actions)) => {
61 if actions != new_actions {
62 actions = new_actions;
63 updated_actions = true;
64 debug!("Actions changed; recalculating steps");
65 break 'steps;
66 }
67 },
68 Ok(None) => {
69 trace!("No follow-up actions from step {:?}", action);
70 },
71 Err(e) => {
72 hit_error = true;
73 error!(
74 "Error applying batch action {:?} to reconcile batch {:?}: {:?}",
75 action,
76 self.index(),
77 e
78 );
79 errors.push((action.clone(), e));
80 }
81 }
82 }
83
84 if updated_actions {
85 continue;
87 }
88
89 if !hit_error {
90 info!(
91 "Successfully reconciled batch triple {:?} with final actions {:#?}",
92 self.index(),
93 actions
94 );
95 return Ok(());
96 } else {
97 error!("Failed to reconcile batch triple {:?} due to errors.", self.index());
98 for error in errors {
99 error!("{:#?}",error);
100 }
101 return Err(BatchReconciliationError::ReconciliationFailed {
102 index: self.index().clone(),
103 }.into());
104 }
105 }
106 }
107}