batch_mode_batch_reconciliation/
execute_reconciliation.rs

1// ---------------- [ File: src/execute_reconciliation.rs ]
2crate::ix!();
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum BatchErrorFileProcessingOperation {
6    LogErrors,
7    RetryFailedRequests,
8    // Add other operations as needed
9}
10
11pub async fn execute_reconciliation_operation<OutputF, ErrorF, OFut, EFut>(
12    triple:                 &mut BatchFileTriple,
13    client:                 &OpenAIClientHandle,
14    operation:              &BatchFileTripleReconciliationOperation,
15    expected_content_type:  &ExpectedContentType,
16    process_output_file_fn: &OutputF,
17    process_error_file_fn:  &ErrorF,
18
19) -> Result<Option<BatchFileReconciliationRecommendedCourseOfAction>, BatchReconciliationError>
20where
21    OutputF: Fn(&BatchFileTriple, &dyn BatchWorkspaceInterface, &ExpectedContentType) -> OFut + Send + Sync,
22    ErrorF:  Fn(&BatchFileTriple, &[BatchErrorFileProcessingOperation]) -> EFut + Send + Sync,
23    OFut:    Future<Output = Result<(), BatchOutputProcessingError>> + Send,
24    EFut:    Future<Output = Result<(), BatchErrorProcessingError>> + Send,
25{
26    let workspace = triple.workspace();
27
28    info!(
29        "executing reconciliation operation {:?} for batch {:#?}",
30        operation, triple
31    );
32
33    let mut new_recommended_actions = None;
34
35    use BatchFileTripleReconciliationOperation::*;
36
37    match operation {
38        EnsureInputRequestIdsMatchErrorRequestIds => {
39            triple.ensure_input_matches_error().await?;
40        }
41        EnsureInputRequestIdsMatchOutputRequestIds => {
42            triple.ensure_input_matches_output().await?;
43        }
44        EnsureInputRequestIdsMatchOutputRequestIdsCombinedWithErrorRequestIds => {
45            triple.ensure_input_matches_output_and_error().await?;
46        }
47        ProcessBatchErrorFile => {
48            let operations = vec![
49                BatchErrorFileProcessingOperation::LogErrors,
50                BatchErrorFileProcessingOperation::RetryFailedRequests,
51            ];
52            process_error_file_fn(triple, &operations).await?;
53        }
54        ProcessBatchOutputFile => {
55            process_output_file_fn(triple, &**workspace, expected_content_type).await?;
56        }
57        MoveBatchInputAndErrorToTheDoneDirectory => {
58            triple.move_input_and_error_to_done().await?;
59        }
60        MoveBatchInputAndOutputToTheDoneDirectory => {
61            triple.move_input_and_output_to_done().await?;
62        }
63        MoveBatchTripleToTheDoneDirectory => {
64            triple.move_all_to_done().await?;
65        }
66        CheckForBatchOutputAndErrorFileOnline => {
67            check_for_and_download_output_and_error_online(triple, client).await?;
68            new_recommended_actions = Some(recalculate_recommended_actions(triple)?);
69        }
70        RecalculateRecommendedCourseOfActionIfTripleChanged => {
71            new_recommended_actions = Some(recalculate_recommended_actions(triple)?);
72        }
73        _ => {
74            return Err(BatchReconciliationError::OperationNotImplemented {
75                operation: *operation,
76            });
77        }
78    }
79
80    Ok(new_recommended_actions)
81}