kimi_wire/dispatch.rs
1//! Ready-made dispatch loop for wire protocol conversations.
2//!
3//! [`process_messages`](crate::dispatch::process_messages) runs an async loop that reads incoming messages,
4//! parses them, and delegates to a user-provided handler. Timeouts and
5//! parse errors are handled internally so the caller only has to implement
6//! business logic.
7
8use std::future::Future;
9
10use tracing::{debug, info, warn};
11
12use crate::client::WireClient;
13use crate::error::WireError;
14use crate::message::{parse_wire_message, WireMessage};
15
16/// A response to be sent back to the agent.
17#[derive(Debug)]
18pub struct WireResponse {
19 /// The JSON-RPC request id this response answers.
20 pub id: String,
21 /// The JSON-RPC result payload.
22 pub result: serde_json::Value,
23}
24
25/// Process wire messages in a loop, handling events and requests.
26///
27/// The loop exits when:
28/// * the underlying transport returns [`WireError::StreamClosed`];
29/// * the handler returns an `Err`;
30/// * the client encounters an unrecoverable I/O error.
31///
32/// Parse errors and unknown message types are logged and skipped — the loop
33/// keeps running.
34///
35/// # Errors
36///
37/// Returns [`WireError`] if reading from the client fails, the handler returns
38/// an error, or sending a response fails.
39///
40/// # Example
41///
42/// ```no_run
43/// # async fn example<C: kimi_wire::WireClient>(client: &mut C) -> Result<(), kimi_wire::WireError> {
44/// use kimi_wire::dispatch::{process_messages, WireResponse};
45/// use kimi_wire::message::WireMessage;
46///
47/// process_messages(client, |msg| async move {
48/// match msg {
49/// WireMessage::Request(req) => {
50/// // handle request...
51/// Ok(Some(WireResponse { id: req.id, result: serde_json::json!(null) }))
52/// }
53/// _ => Ok(None),
54/// }
55/// }).await
56/// # }
57pub async fn process_messages<C, F, Fut>(client: &mut C, mut handler: F) -> Result<(), WireError>
58where
59 C: WireClient,
60 F: FnMut(WireMessage) -> Fut,
61 Fut: Future<Output = Result<Option<WireResponse>, WireError>>,
62{
63 info!("starting process_messages loop");
64 loop {
65 let raw = match client.read_raw_message().await {
66 Ok(msg) => msg,
67 Err(e) => {
68 warn!(error = %e, "Wire message error, exiting loop");
69 break;
70 }
71 };
72
73 let msg = match parse_wire_message(raw) {
74 Ok(msg) => {
75 debug!("parsed wire message");
76 msg
77 }
78 Err(e) => {
79 warn!(error = %e, "Failed to parse wire message, skipping");
80 continue;
81 }
82 };
83
84 if let Some(response) = handler(msg).await? {
85 client.send_response(&response.id, &response.result).await?;
86 }
87 }
88 info!("process_messages loop exited");
89 Ok(())
90}