1use checkpoint::Checkpointer;
2use message::input::Message as MessageIn;
3use message::output::Message as MessageOut;
4use processor::Processor;
5use transport::Transport;
6
7pub mod checkpoint;
8pub mod message;
9pub mod processor;
10pub mod transport;
11
12#[derive(Debug, thiserror::Error)]
13pub enum RunError<IoError, ProcessorError> {
14 UnexpectedMessage(MessageIn),
15
16 #[error(transparent)]
17 IoError(IoError),
18
19 #[error(transparent)]
20 ProcessorError(ProcessorError),
21}
22
23pub async fn run<T: Transport + Send, P: Processor<T>>(
24 mut transport: T,
25 mut processor: P,
26) -> Result<(), RunError<T::Error, P::Error>> {
27 loop {
28 let msg = transport.read_message().await.map_err(RunError::IoError)?;
29 let msg_id = msg.id();
30
31 {
32 let mut checkpointer = Checkpointer::new(&mut transport);
35
36 match msg {
37 MessageIn::Initialize(m) => processor.initialize(m).await,
38 MessageIn::ProcessRecords(m) => {
39 processor.process_records(m, &mut checkpointer).await
40 }
41 MessageIn::Shutdown(m) => processor.shutdown(m, &mut checkpointer).await,
42 MessageIn::ShutdownRequested(m) => {
43 processor.shutdown_requested(m, &mut checkpointer).await
44 }
45 MessageIn::LeaseLost(m) => processor.lease_lost(m).await,
46 MessageIn::ShardEnded(m) => processor.shard_ended(m).await,
47
48 msg => {
49 return Err(RunError::UnexpectedMessage(msg));
50 }
51 }
52 .map_err(RunError::ProcessorError)?;
53 }
54
55 {
56 let response = MessageOut::Status(message::output::StatusMessage {
59 response_for: msg_id.into(),
60 });
61
62 transport
63 .write_message(&response)
64 .await
65 .map_err(RunError::IoError)?;
66 }
67 }
68}