kimi_wire/dispatch.rs
1//! Ready-made dispatch loop for wire protocol conversations.
2//!
3//! [`process_messages`](crate::dispatch::process_messages) runs an async loop that reads incoming messages,
4//! parses them, and delegates to a user-provided handler. Timeouts and
5//! parse errors are handled internally so the caller only has to implement
6//! business logic.
7
8use std::future::Future;
9
10use tracing::{debug, info, warn};
11
12use crate::client::WireClient;
13use crate::error::WireError;
14use crate::message::{parse_wire_message, WireMessage};
15
16/// A response to be sent back to the agent.
17#[derive(Debug)]
18pub struct WireResponse {
19 /// The JSON-RPC request id this response answers.
20 pub id: String,
21 /// The JSON-RPC result payload.
22 pub result: serde_json::Value,
23}
24
25/// Process wire messages in a loop, handling events and requests.
26///
27/// The loop exits when:
28/// * the underlying transport returns [`WireError::StreamClosed`];
29/// * the handler returns an `Err`;
30/// * the client encounters an unrecoverable I/O error.
31///
32/// Parse errors and unknown message types are logged and skipped — the loop
33/// keeps running.
34///
35/// # Example
36///
37/// ```no_run
38/// # async fn example<C: kimi_wire::WireClient>(client: &mut C) -> Result<(), kimi_wire::WireError> {
39/// use kimi_wire::dispatch::{process_messages, WireResponse};
40/// use kimi_wire::message::WireMessage;
41///
42/// process_messages(client, |msg| async move {
43/// match msg {
44/// WireMessage::Request(req) => {
45/// // handle request...
46/// Ok(Some(WireResponse { id: req.id, result: serde_json::json!(null) }))
47/// }
48/// _ => Ok(None),
49/// }
50/// }).await
51/// # }
52/// ```
53pub async fn process_messages<C, F, Fut>(client: &mut C, mut handler: F) -> Result<(), WireError>
54where
55 C: WireClient,
56 F: FnMut(WireMessage) -> Fut,
57 Fut: Future<Output = Result<Option<WireResponse>, WireError>>,
58{
59 info!("starting process_messages loop");
60 loop {
61 let raw = match client.read_raw_message().await {
62 Ok(msg) => msg,
63 Err(e) => {
64 warn!(error = %e, "Wire message error, exiting loop");
65 break;
66 }
67 };
68
69 let msg = match parse_wire_message(raw) {
70 Ok(msg) => {
71 debug!("parsed wire message");
72 msg
73 }
74 Err(e) => {
75 warn!(error = %e, "Failed to parse wire message, skipping");
76 continue;
77 }
78 };
79
80 if let Some(response) = handler(msg).await? {
81 client.send_response(&response.id, &response.result).await?;
82 }
83 }
84 info!("process_messages loop exited");
85 Ok(())
86}