Skip to main content

kimi_wire/
dispatch.rs

1//! Ready-made dispatch loop for wire protocol conversations.
2//!
3//! [`process_messages`](crate::dispatch::process_messages) runs an async loop that reads incoming messages,
4//! parses them, and delegates to a user-provided handler. Timeouts and
5//! parse errors are handled internally so the caller only has to implement
6//! business logic.
7
8use std::future::Future;
9
10use tracing::{debug, info, warn};
11
12use crate::client::WireClient;
13use crate::error::WireError;
14use crate::message::{parse_wire_message, WireMessage};
15
16/// A response to be sent back to the agent.
17#[derive(Debug)]
18pub struct WireResponse {
19    /// The JSON-RPC request id this response answers.
20    pub id: String,
21    /// The JSON-RPC result payload.
22    pub result: serde_json::Value,
23}
24
25/// Process wire messages in a loop, handling events and requests.
26///
27/// The loop exits when:
28/// * the underlying transport returns [`WireError::StreamClosed`];
29/// * the handler returns an `Err`;
30/// * the client encounters an unrecoverable I/O error.
31///
32/// Parse errors and unknown message types are logged and skipped — the loop
33/// keeps running.
34///
35/// # Example
36///
37/// ```no_run
38/// # async fn example<C: kimi_wire::WireClient>(client: &mut C) -> Result<(), kimi_wire::WireError> {
39/// use kimi_wire::dispatch::{process_messages, WireResponse};
40/// use kimi_wire::message::WireMessage;
41///
42/// process_messages(client, |msg| async move {
43///     match msg {
44///         WireMessage::Request(req) => {
45///             // handle request...
46///             Ok(Some(WireResponse { id: req.id, result: serde_json::json!(null) }))
47///         }
48///         _ => Ok(None),
49///     }
50/// }).await
51/// # }
52/// ```
53pub async fn process_messages<C, F, Fut>(client: &mut C, mut handler: F) -> Result<(), WireError>
54where
55    C: WireClient,
56    F: FnMut(WireMessage) -> Fut,
57    Fut: Future<Output = Result<Option<WireResponse>, WireError>>,
58{
59    info!("starting process_messages loop");
60    loop {
61        let raw = match client.read_raw_message().await {
62            Ok(msg) => msg,
63            Err(e) => {
64                warn!(error = %e, "Wire message error, exiting loop");
65                break;
66            }
67        };
68
69        let msg = match parse_wire_message(raw) {
70            Ok(msg) => {
71                debug!("parsed wire message");
72                msg
73            }
74            Err(e) => {
75                warn!(error = %e, "Failed to parse wire message, skipping");
76                continue;
77            }
78        };
79
80        if let Some(response) = handler(msg).await? {
81            client.send_response(&response.id, &response.result).await?;
82        }
83    }
84    info!("process_messages loop exited");
85    Ok(())
86}