Skip to main content

kimi_wire/
dispatch.rs

1//! Ready-made dispatch loop for wire protocol conversations.
2//!
3//! [`process_messages`](crate::dispatch::process_messages) runs an async loop that reads incoming messages,
4//! parses them, and delegates to a user-provided handler. Timeouts and
5//! parse errors are handled internally so the caller only has to implement
6//! business logic.
7
8use std::future::Future;
9
10use tracing::{debug, info, warn};
11
12use crate::client::WireClient;
13use crate::error::WireError;
14use crate::message::{parse_wire_message, WireMessage};
15
16/// A response to be sent back to the agent.
17#[derive(Debug)]
18pub struct WireResponse {
19    /// The JSON-RPC request id this response answers.
20    pub id: String,
21    /// The JSON-RPC result payload.
22    pub result: serde_json::Value,
23}
24
25/// Process wire messages in a loop, handling events and requests.
26///
27/// The loop exits when:
28/// * the underlying transport returns [`WireError::StreamClosed`];
29/// * the handler returns an `Err`;
30/// * the client encounters an unrecoverable I/O error.
31///
32/// Parse errors and unknown message types are logged and skipped — the loop
33/// keeps running.
34///
35/// # Errors
36///
37/// Returns [`WireError`] if reading from the client fails, the handler returns
38/// an error, or sending a response fails.
39///
40/// # Example
41///
42/// ```no_run
43/// # async fn example<C: kimi_wire::WireClient>(client: &mut C) -> Result<(), kimi_wire::WireError> {
44/// use kimi_wire::dispatch::{process_messages, WireResponse};
45/// use kimi_wire::message::WireMessage;
46///
47/// process_messages(client, |msg| async move {
48///     match msg {
49///         WireMessage::Request(req) => {
50///             // handle request...
51///             Ok(Some(WireResponse { id: req.id, result: serde_json::json!(null) }))
52///         }
53///         _ => Ok(None),
54///     }
55/// }).await
56/// # }
57pub async fn process_messages<C, F, Fut>(client: &mut C, mut handler: F) -> Result<(), WireError>
58where
59    C: WireClient,
60    F: FnMut(WireMessage) -> Fut,
61    Fut: Future<Output = Result<Option<WireResponse>, WireError>>,
62{
63    info!("starting process_messages loop");
64    loop {
65        let raw = match client.read_raw_message().await {
66            Ok(msg) => msg,
67            Err(e) => {
68                warn!(error = %e, "Wire message error, exiting loop");
69                break;
70            }
71        };
72
73        let msg = match parse_wire_message(raw) {
74            Ok(msg) => {
75                debug!("parsed wire message");
76                msg
77            }
78            Err(e) => {
79                warn!(error = %e, "Failed to parse wire message, skipping");
80                continue;
81            }
82        };
83
84        if let Some(response) = handler(msg).await? {
85            client.send_response(&response.id, &response.result).await?;
86        }
87    }
88    info!("process_messages loop exited");
89    Ok(())
90}