batch_mode_batch_reconciliation/
execute_reconciliation.rs1crate::ix!();
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum BatchErrorFileProcessingOperation {
6 LogErrors,
7 RetryFailedRequests,
8 }
10
11pub async fn execute_reconciliation_operation<OutputF, ErrorF, OFut, EFut>(
12 triple: &mut BatchFileTriple,
13 client: &OpenAIClientHandle,
14 operation: &BatchFileTripleReconciliationOperation,
15 expected_content_type: &ExpectedContentType,
16 process_output_file_fn: &OutputF,
17 process_error_file_fn: &ErrorF,
18
19) -> Result<Option<BatchFileReconciliationRecommendedCourseOfAction>, BatchReconciliationError>
20where
21 OutputF: Fn(&BatchFileTriple, &dyn BatchWorkspaceInterface, &ExpectedContentType) -> OFut + Send + Sync,
22 ErrorF: Fn(&BatchFileTriple, &[BatchErrorFileProcessingOperation]) -> EFut + Send + Sync,
23 OFut: Future<Output = Result<(), BatchOutputProcessingError>> + Send,
24 EFut: Future<Output = Result<(), BatchErrorProcessingError>> + Send,
25{
26 let workspace = triple.workspace();
27
28 info!(
29 "executing reconciliation operation {:?} for batch {:#?}",
30 operation, triple
31 );
32
33 let mut new_recommended_actions = None;
34
35 use BatchFileTripleReconciliationOperation::*;
36
37 match operation {
38 EnsureInputRequestIdsMatchErrorRequestIds => {
39 triple.ensure_input_matches_error().await?;
40 }
41 EnsureInputRequestIdsMatchOutputRequestIds => {
42 triple.ensure_input_matches_output().await?;
43 }
44 EnsureInputRequestIdsMatchOutputRequestIdsCombinedWithErrorRequestIds => {
45 triple.ensure_input_matches_output_and_error().await?;
46 }
47 ProcessBatchErrorFile => {
48 let operations = vec![
49 BatchErrorFileProcessingOperation::LogErrors,
50 BatchErrorFileProcessingOperation::RetryFailedRequests,
51 ];
52 process_error_file_fn(triple, &operations).await?;
53 }
54 ProcessBatchOutputFile => {
55 process_output_file_fn(triple, &**workspace, expected_content_type).await?;
56 }
57 MoveBatchInputAndErrorToTheDoneDirectory => {
58 triple.move_input_and_error_to_done().await?;
59 }
60 MoveBatchInputAndOutputToTheDoneDirectory => {
61 triple.move_input_and_output_to_done().await?;
62 }
63 MoveBatchTripleToTheDoneDirectory => {
64 triple.move_all_to_done().await?;
65 }
66 CheckForBatchOutputAndErrorFileOnline => {
67 check_for_and_download_output_and_error_online(triple, client).await?;
68 new_recommended_actions = Some(recalculate_recommended_actions(triple)?);
69 }
70 RecalculateRecommendedCourseOfActionIfTripleChanged => {
71 new_recommended_actions = Some(recalculate_recommended_actions(triple)?);
72 }
73 _ => {
74 return Err(BatchReconciliationError::OperationNotImplemented {
75 operation: *operation,
76 });
77 }
78 }
79
80 Ok(new_recommended_actions)
81}