batch_mode_batch_reconciliation/
reconcile_unprocessed.rs

1// ---------------- [ File: batch-mode-batch-reconciliation/src/reconcile_unprocessed.rs ]
2crate::ix!();
3
4/// Trait describing how a `BatchFileTriple` can be reconciled if unprocessed.
5#[async_trait]
6pub trait ReconcileUnprocessed<E> {
7    async fn reconcile_unprocessed(
8        &mut self,
9        client:                 &dyn LanguageModelClientInterface<E>,
10        expected_content_type:  &ExpectedContentType,
11        process_output_file_fn: &BatchWorkflowProcessOutputFileFn,
12        process_error_file_fn:  &BatchWorkflowProcessErrorFileFn,
13    ) -> Result<(), E>;
14}
15
16/* 
17   TYPE ALIASES: 
18   They define the EXACT function pointer signature we require.  
19   Notice that each parameter is `'a`, and the second parameter is 
20   `&'a (dyn BatchWorkspaceInterface + 'a)`, not just `&'a dyn BatchWorkspaceInterface`.
21*/
22
23pub type BatchWorkflowProcessOutputFileFn = for<'a> fn(
24    &'a BatchFileTriple,
25    &'a (dyn BatchWorkspaceInterface + 'a),
26    &'a ExpectedContentType,
27) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>;
28
29pub type BatchWorkflowProcessErrorFileFn = for<'a> fn(
30    &'a BatchFileTriple,
31    &'a [BatchErrorFileProcessingOperation],
32) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>>;
33
34#[cfg(test)]
35mod reconcile_unprocessed_tests {
36    use super::*;
37    use std::{
38        future::Future,
39        pin::Pin,
40        fs,
41    };
42
43    fn mock_process_output<'a>(
44        _triple: &'a BatchFileTriple,
45        _workspace: &'a (dyn BatchWorkspaceInterface + 'a),
46        _ect: &'a ExpectedContentType,
47    ) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>> {
48        Box::pin(async move {
49            debug!("mock_process_output called");
50            Ok(())
51        })
52    }
53
54    fn mock_process_error<'a>(
55        _triple: &'a BatchFileTriple,
56        _ops: &'a [BatchErrorFileProcessingOperation],
57    ) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>> {
58        Box::pin(async move {
59            debug!("mock_process_error called");
60            Ok(())
61        })
62    }
63
64    const MOCK_PROCESS_OUTPUT: BatchWorkflowProcessOutputFileFn = mock_process_output;
65    const MOCK_PROCESS_ERROR:  BatchWorkflowProcessErrorFileFn  = mock_process_error;
66
67    #[traced_test]
68    async fn test_reconcile_unprocessed_input_only() {
69        // Use a real tokio runtime so that tokio::fs calls won't panic.
70        let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp()
71            .await
72            .expect("expected ephemeral workspace");
73
74        // Build a triple with index=42
75        let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace.clone());
76        triple.set_index(BatchIndex::from(42u64));
77
78        // Write input file in the workspace location:
79        let input_path = workspace.input_filename(triple.index());
80        fs::write(&input_path, b"fake input").unwrap();
81        triple.set_input_path(Some(input_path.to_string_lossy().to_string().into()));
82
83        // Also provide a metadata file in the correct workspace location.
84        let meta_path = workspace.metadata_filename(triple.index());
85        fs::write(
86            &meta_path,
87            r#"{"batch_id":"some_mock_batch_id_for_42","input_file_id":"fake_input_file_id_42"}"#
88        ).unwrap();
89
90        // Mock client that eventually finishes with no output/error files
91        let client_mock = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
92            .build()
93            .unwrap();
94        client_mock.configure_inprogress_then_complete_with("some_mock_batch_id_for_42", false, false);
95
96        let client_mock = Arc::new(client_mock) as Arc<dyn LanguageModelClientInterface<MockBatchClientError>>;
97
98        let ect = ExpectedContentType::JsonLines;
99        let result = triple.reconcile_unprocessed(
100            client_mock.as_ref(),
101            &ect,
102            &MOCK_PROCESS_OUTPUT,
103            &MOCK_PROCESS_ERROR,
104        ).await;
105
106        // Should succeed, since there's just an input, no discovered output or error.
107        assert!(result.is_ok(), "Input-only triple with no online files should not fail");
108    }
109}