batch_mode_batch_reconciliation/
reconcile_unprocessed.rs1crate::ix!();
3
4#[async_trait]
6pub trait ReconcileUnprocessed<E> {
7 async fn reconcile_unprocessed(
8 &mut self,
9 client: &dyn LanguageModelClientInterface<E>,
10 expected_content_type: &ExpectedContentType,
11 process_output_file_fn: &BatchWorkflowProcessOutputFileFn,
12 process_error_file_fn: &BatchWorkflowProcessErrorFileFn,
13 ) -> Result<(), E>;
14}
15
16pub type BatchWorkflowProcessOutputFileFn = for<'a> fn(
24 &'a BatchFileTriple,
25 &'a (dyn BatchWorkspaceInterface + 'a),
26 &'a ExpectedContentType,
27) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>;
28
29pub type BatchWorkflowProcessErrorFileFn = for<'a> fn(
30 &'a BatchFileTriple,
31 &'a [BatchErrorFileProcessingOperation],
32) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>>;
33
34#[cfg(test)]
35mod reconcile_unprocessed_tests {
36 use super::*;
37 use std::{
38 future::Future,
39 pin::Pin,
40 fs,
41 };
42
43 fn mock_process_output<'a>(
44 _triple: &'a BatchFileTriple,
45 _workspace: &'a (dyn BatchWorkspaceInterface + 'a),
46 _ect: &'a ExpectedContentType,
47 ) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>> {
48 Box::pin(async move {
49 debug!("mock_process_output called");
50 Ok(())
51 })
52 }
53
54 fn mock_process_error<'a>(
55 _triple: &'a BatchFileTriple,
56 _ops: &'a [BatchErrorFileProcessingOperation],
57 ) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>> {
58 Box::pin(async move {
59 debug!("mock_process_error called");
60 Ok(())
61 })
62 }
63
64 const MOCK_PROCESS_OUTPUT: BatchWorkflowProcessOutputFileFn = mock_process_output;
65 const MOCK_PROCESS_ERROR: BatchWorkflowProcessErrorFileFn = mock_process_error;
66
67 #[traced_test]
68 async fn test_reconcile_unprocessed_input_only() {
69 let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp()
71 .await
72 .expect("expected ephemeral workspace");
73
74 let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace.clone());
76 triple.set_index(BatchIndex::from(42u64));
77
78 let input_path = workspace.input_filename(triple.index());
80 fs::write(&input_path, b"fake input").unwrap();
81 triple.set_input_path(Some(input_path.to_string_lossy().to_string().into()));
82
83 let meta_path = workspace.metadata_filename(triple.index());
85 fs::write(
86 &meta_path,
87 r#"{"batch_id":"some_mock_batch_id_for_42","input_file_id":"fake_input_file_id_42"}"#
88 ).unwrap();
89
90 let client_mock = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
92 .build()
93 .unwrap();
94 client_mock.configure_inprogress_then_complete_with("some_mock_batch_id_for_42", false, false);
95
96 let client_mock = Arc::new(client_mock) as Arc<dyn LanguageModelClientInterface<MockBatchClientError>>;
97
98 let ect = ExpectedContentType::JsonLines;
99 let result = triple.reconcile_unprocessed(
100 client_mock.as_ref(),
101 &ect,
102 &MOCK_PROCESS_OUTPUT,
103 &MOCK_PROCESS_ERROR,
104 ).await;
105
106 assert!(result.is_ok(), "Input-only triple with no online files should not fail");
108 }
109}