Skip to main content

kcl_async/
lib.rs

1use checkpoint::Checkpointer;
2use message::input::Message as MessageIn;
3use message::output::Message as MessageOut;
4use processor::Processor;
5use transport::Transport;
6
7pub mod checkpoint;
8pub mod message;
9pub mod processor;
10pub mod transport;
11
12#[derive(Debug, thiserror::Error)]
13pub enum RunError<IoError, ProcessorError> {
14    UnexpectedMessage(MessageIn),
15
16    #[error(transparent)]
17    IoError(IoError),
18
19    #[error(transparent)]
20    ProcessorError(ProcessorError),
21}
22
23pub async fn run<T: Transport + Send, P: Processor<T>>(
24    mut transport: T,
25    mut processor: P,
26) -> Result<(), RunError<T::Error, P::Error>> {
27    loop {
28        let msg = transport.read_message().await.map_err(RunError::IoError)?;
29        let msg_id = msg.id();
30
31        {
32            // Handle message
33
34            let mut checkpointer = Checkpointer::new(&mut transport);
35
36            match msg {
37                MessageIn::Initialize(m) => processor.initialize(m).await,
38                MessageIn::ProcessRecords(m) => {
39                    processor.process_records(m, &mut checkpointer).await
40                }
41                MessageIn::Shutdown(m) => processor.shutdown(m, &mut checkpointer).await,
42                MessageIn::ShutdownRequested(m) => {
43                    processor.shutdown_requested(m, &mut checkpointer).await
44                }
45                MessageIn::LeaseLost(m) => processor.lease_lost(m).await,
46                MessageIn::ShardEnded(m) => processor.shard_ended(m).await,
47
48                msg => {
49                    return Err(RunError::UnexpectedMessage(msg));
50                }
51            }
52            .map_err(RunError::ProcessorError)?;
53        }
54
55        {
56            // Acknowledge message
57
58            let response = MessageOut::Status(message::output::StatusMessage {
59                response_for: msg_id.into(),
60            });
61
62            transport
63                .write_message(&response)
64                .await
65                .map_err(RunError::IoError)?;
66        }
67    }
68}