Skip to main content

agecli_skill_protocol/
writer.rs

1//! Write skill messages to a stream.
2
3use super::message::*;
4use serde::Serialize;
5use tokio::io::{AsyncWrite, AsyncWriteExt};
6
7/// Writes binary-framed skill messages to a stream.
8pub struct SkillMessageWriter<W> {
9    writer: W,
10}
11
12impl<W: AsyncWrite + Unpin> SkillMessageWriter<W> {
13    pub fn new(writer: W) -> Self {
14        Self { writer }
15    }
16
17    /// Write a raw skill message to the stream.
18    async fn write_message(&mut self, msg: &SkillMessage) -> std::io::Result<()> {
19        let type_bytes = (msg.msg_type as u32).to_be_bytes();
20        let len_bytes = (msg.payload.len() as u32).to_be_bytes();
21
22        self.writer.write_all(&type_bytes).await?;
23        self.writer.write_all(&len_bytes).await?;
24        if !msg.payload.is_empty() {
25            self.writer.write_all(&msg.payload).await?;
26        }
27        self.writer.flush().await?;
28        Ok(())
29    }
30
31    /// Write a typed payload as a message.
32    async fn write_typed<T: Serialize>(
33        &mut self,
34        msg_type: SkillMessageType,
35        payload: &T,
36    ) -> std::io::Result<()> {
37        let json = serde_json::to_vec(payload)
38            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
39        self.write_message(&SkillMessage::new(msg_type, json)).await
40    }
41
42    // ── Server → Skill messages ──────────────────────────────────────────
43
44    pub async fn write_execute(&mut self, payload: &ExecutePayload) -> std::io::Result<()> {
45        self.write_typed(SkillMessageType::Execute, payload).await
46    }
47
48    pub async fn write_cancel(&mut self, execution_id: &str) -> std::io::Result<()> {
49        let payload = execution_id.as_bytes().to_vec();
50        self.write_message(&SkillMessage::new(SkillMessageType::Cancel, payload))
51            .await
52    }
53
54    pub async fn write_stdin_data(&mut self, payload: &StdinDataPayload) -> std::io::Result<()> {
55        self.write_typed(SkillMessageType::StdinData, payload).await
56    }
57
58    pub async fn write_resize(&mut self, payload: &ResizePayload) -> std::io::Result<()> {
59        self.write_typed(SkillMessageType::Resize, payload).await
60    }
61
62    pub async fn write_signal(&mut self, payload: &SignalPayload) -> std::io::Result<()> {
63        self.write_typed(SkillMessageType::Signal, payload).await
64    }
65
66    pub async fn write_start_session(
67        &mut self,
68        payload: &StartSessionPayload,
69    ) -> std::io::Result<()> {
70        self.write_typed(SkillMessageType::StartSession, payload)
71            .await
72    }
73
74    pub async fn write_shutdown(&mut self) -> std::io::Result<()> {
75        self.write_message(&SkillMessage::new(SkillMessageType::Shutdown, vec![]))
76            .await
77    }
78
79    // ── Skill → Server messages ──────────────────────────────────────────
80
81    pub async fn write_ack(&mut self, payload: &AckPayload) -> std::io::Result<()> {
82        self.write_typed(SkillMessageType::Ack, payload).await
83    }
84
85    pub async fn write_stdout_chunk(&mut self, payload: &DataChunkPayload) -> std::io::Result<()> {
86        self.write_typed(SkillMessageType::StdoutChunk, payload)
87            .await
88    }
89
90    pub async fn write_stderr_chunk(&mut self, payload: &DataChunkPayload) -> std::io::Result<()> {
91        self.write_typed(SkillMessageType::StderrChunk, payload)
92            .await
93    }
94
95    pub async fn write_progress(&mut self, payload: &ProgressPayload) -> std::io::Result<()> {
96        self.write_typed(SkillMessageType::Progress, payload).await
97    }
98
99    pub async fn write_completed(&mut self, payload: &CompletedPayload) -> std::io::Result<()> {
100        self.write_typed(SkillMessageType::Completed, payload).await
101    }
102
103    pub async fn write_error(&mut self, payload: &ErrorPayload) -> std::io::Result<()> {
104        self.write_typed(SkillMessageType::Error, payload).await
105    }
106
107    pub async fn write_session_started(
108        &mut self,
109        payload: &SessionStartedPayload,
110    ) -> std::io::Result<()> {
111        self.write_typed(SkillMessageType::SessionStarted, payload)
112            .await
113    }
114
115    // ── Proxy messages ───────────────────────────────────────────────────
116
117    pub async fn write_proxy_submit(
118        &mut self,
119        payload: &ProxySubmitPayload,
120    ) -> std::io::Result<()> {
121        self.write_typed(SkillMessageType::ProxySubmit, payload)
122            .await
123    }
124
125    pub async fn write_proxy_cancel(&mut self, proxy_id: &str) -> std::io::Result<()> {
126        let payload = proxy_id.as_bytes().to_vec();
127        self.write_message(&SkillMessage::new(SkillMessageType::ProxyCancel, payload))
128            .await
129    }
130
131    pub async fn write_proxy_stdout_chunk(
132        &mut self,
133        payload: &ProxyChunkPayload,
134    ) -> std::io::Result<()> {
135        self.write_typed(SkillMessageType::ProxyStdoutChunk, payload)
136            .await
137    }
138
139    pub async fn write_proxy_stderr_chunk(
140        &mut self,
141        payload: &ProxyChunkPayload,
142    ) -> std::io::Result<()> {
143        self.write_typed(SkillMessageType::ProxyStderrChunk, payload)
144            .await
145    }
146
147    pub async fn write_proxy_completed(
148        &mut self,
149        payload: &ProxyCompletedPayload,
150    ) -> std::io::Result<()> {
151        self.write_typed(SkillMessageType::ProxyCompleted, payload)
152            .await
153    }
154
155    pub async fn write_proxy_rejected(
156        &mut self,
157        payload: &ProxyRejectedPayload,
158    ) -> std::io::Result<()> {
159        self.write_typed(SkillMessageType::ProxyRejected, payload)
160            .await
161    }
162}
163
164#[cfg(test)]
165#[path = "writer_tests.rs"]
166mod tests;