Skip to main content

zap/
server.rs

1//! ZAP server implementation
2//!
3//! Cap'n Proto RPC server for AI agent communication.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use zap::{Server, Config};
9//! use zap::server::{ToolHandler, ResourceHandler, PromptHandler, ToolDef};
10//! use std::collections::HashMap;
11//! use std::sync::Arc;
12//!
13//! struct MyToolHandler;
14//!
15//! impl ToolHandler for MyToolHandler {
16//!     fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<ToolDef>> + Send + '_>> {
17//!         Box::pin(async {
18//!             vec![ToolDef {
19//!                 name: "echo".into(),
20//!                 description: "Echo input".into(),
21//!                 schema: b"{}".to_vec(),
22//!                 annotations: HashMap::new(),
23//!             }]
24//!         })
25//!     }
26//!
27//!     fn call(
28//!         &self,
29//!         _name: &str,
30//!         args: &[u8],
31//!         _metadata: HashMap<String, String>,
32//!     ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>, String>> + Send + '_>> {
33//!         let args = args.to_vec();
34//!         Box::pin(async move { Ok(args) })
35//!     }
36//! }
37//!
38//! #[tokio::main]
39//! async fn main() -> zap::Result<()> {
40//!     let mut server = Server::new(Config::default());
41//!     server.set_tool_handler(Arc::new(MyToolHandler));
42//!     server.run().await
43//! }
44//! ```
45
46use crate::zap_capnp::{
47    prompt_message, resource_stream, zap,
48};
49use crate::{Config, Error, Result};
50use capnp::capability::Promise;
51use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
52use futures::AsyncReadExt;
53use std::collections::HashMap;
54use std::sync::atomic::{AtomicU64, Ordering};
55use std::sync::Arc;
56use tokio::net::TcpListener;
57use tokio::task::LocalSet;
58
59/// Tool definition
60#[derive(Debug, Clone)]
61pub struct ToolDef {
62    pub name: String,
63    pub description: String,
64    pub schema: Vec<u8>,
65    pub annotations: HashMap<String, String>,
66}
67
68/// Resource definition
69#[derive(Debug, Clone)]
70pub struct ResourceDef {
71    pub uri: String,
72    pub name: String,
73    pub description: String,
74    pub mime_type: String,
75    pub annotations: HashMap<String, String>,
76}
77
78/// Resource content
79#[derive(Debug, Clone)]
80pub enum ResourceContentData {
81    Text(String),
82    Blob(Vec<u8>),
83}
84
85/// Resource content with metadata
86#[derive(Debug, Clone)]
87pub struct ResourceContentDef {
88    pub uri: String,
89    pub mime_type: String,
90    pub content: ResourceContentData,
91}
92
93/// Prompt definition
94#[derive(Debug, Clone)]
95pub struct PromptDef {
96    pub name: String,
97    pub description: String,
98    pub arguments: Vec<PromptArgument>,
99}
100
101/// Prompt argument
102#[derive(Debug, Clone)]
103pub struct PromptArgument {
104    pub name: String,
105    pub description: String,
106    pub required: bool,
107}
108
109/// Prompt message
110#[derive(Debug, Clone)]
111pub struct PromptMessage {
112    pub role: PromptRole,
113    pub content: PromptContent,
114}
115
116/// Prompt role
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub enum PromptRole {
119    User,
120    Assistant,
121    System,
122}
123
124/// Prompt content
125#[derive(Debug, Clone)]
126pub enum PromptContent {
127    Text(String),
128    Image { data: Vec<u8>, mime_type: String },
129    Resource(ResourceContentDef),
130}
131
132/// Tool handler trait
133///
134/// Implement this trait to handle tool operations.
135pub trait ToolHandler: Send + Sync + 'static {
136    /// List available tools
137    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<ToolDef>> + Send + '_>>;
138
139    /// Call a tool
140    fn call(
141        &self,
142        name: &str,
143        args: &[u8],
144        metadata: HashMap<String, String>,
145    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<Vec<u8>, String>> + Send + '_>>;
146}
147
148/// Resource handler trait
149///
150/// Implement this trait to handle resource operations.
151pub trait ResourceHandler: Send + Sync + 'static {
152    /// List available resources
153    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<ResourceDef>> + Send + '_>>;
154
155    /// Read a resource
156    fn read(
157        &self,
158        uri: &str,
159    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<ResourceContentDef, String>> + Send + '_>>;
160
161    /// Subscribe to resource updates (returns a stream receiver)
162    fn subscribe(
163        &self,
164        uri: &str,
165    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<tokio::sync::mpsc::Receiver<ResourceContentDef>, String>> + Send + '_>>;
166}
167
168/// Prompt handler trait
169///
170/// Implement this trait to handle prompt operations.
171pub trait PromptHandler: Send + Sync + 'static {
172    /// List available prompts
173    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<PromptDef>> + Send + '_>>;
174
175    /// Get a prompt with arguments
176    fn get(
177        &self,
178        name: &str,
179        args: HashMap<String, String>,
180    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<Vec<PromptMessage>, String>> + Send + '_>>;
181}
182
183/// Log handler trait
184pub trait LogHandler: Send + Sync + 'static {
185    fn log(&self, level: LogLevel, message: &str, data: &[u8]);
186}
187
188/// Log level
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum LogLevel {
191    Debug,
192    Info,
193    Warn,
194    Error,
195}
196
197/// Default no-op tool handler
198pub struct NoopToolHandler;
199
200impl ToolHandler for NoopToolHandler {
201    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<ToolDef>> + Send + '_>> {
202        Box::pin(async { Vec::new() })
203    }
204
205    fn call(
206        &self,
207        _name: &str,
208        _args: &[u8],
209        _metadata: HashMap<String, String>,
210    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<Vec<u8>, String>> + Send + '_>> {
211        Box::pin(async { Err("no tool handler registered".to_string()) })
212    }
213}
214
215/// Default no-op resource handler
216pub struct NoopResourceHandler;
217
218impl ResourceHandler for NoopResourceHandler {
219    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<ResourceDef>> + Send + '_>> {
220        Box::pin(async { Vec::new() })
221    }
222
223    fn read(
224        &self,
225        _uri: &str,
226    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<ResourceContentDef, String>> + Send + '_>> {
227        Box::pin(async { Err("no resource handler registered".to_string()) })
228    }
229
230    fn subscribe(
231        &self,
232        _uri: &str,
233    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<tokio::sync::mpsc::Receiver<ResourceContentDef>, String>> + Send + '_>> {
234        Box::pin(async { Err("no resource handler registered".to_string()) })
235    }
236}
237
238/// Default no-op prompt handler
239pub struct NoopPromptHandler;
240
241impl PromptHandler for NoopPromptHandler {
242    fn list(&self) -> std::pin::Pin<Box<dyn std::future::Future<Output = Vec<PromptDef>> + Send + '_>> {
243        Box::pin(async { Vec::new() })
244    }
245
246    fn get(
247        &self,
248        _name: &str,
249        _args: HashMap<String, String>,
250    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<Vec<PromptMessage>, String>> + Send + '_>> {
251        Box::pin(async { Err("no prompt handler registered".to_string()) })
252    }
253}
254
255/// Default log handler (uses tracing)
256pub struct TracingLogHandler;
257
258impl LogHandler for TracingLogHandler {
259    fn log(&self, level: LogLevel, message: &str, _data: &[u8]) {
260        match level {
261            LogLevel::Debug => tracing::debug!("{}", message),
262            LogLevel::Info => tracing::info!("{}", message),
263            LogLevel::Warn => tracing::warn!("{}", message),
264            LogLevel::Error => tracing::error!("{}", message),
265        }
266    }
267}
268
269/// Server info
270#[derive(Debug, Clone)]
271pub struct ServerInfoDef {
272    pub name: String,
273    pub version: String,
274    pub tools: bool,
275    pub resources: bool,
276    pub prompts: bool,
277    pub logging: bool,
278}
279
280impl Default for ServerInfoDef {
281    fn default() -> Self {
282        Self {
283            name: "zap".to_string(),
284            version: crate::VERSION.to_string(),
285            tools: true,
286            resources: true,
287            prompts: true,
288            logging: true,
289        }
290    }
291}
292
293/// ZAP Server
294///
295/// A Cap'n Proto RPC server that implements the Zap interface.
296pub struct Server {
297    config: Config,
298    tool_handler: Arc<dyn ToolHandler>,
299    resource_handler: Arc<dyn ResourceHandler>,
300    prompt_handler: Arc<dyn PromptHandler>,
301    log_handler: Arc<dyn LogHandler>,
302    server_info: ServerInfoDef,
303}
304
305impl Server {
306    /// Create a new server with the given config
307    pub fn new(config: Config) -> Self {
308        Self {
309            config,
310            tool_handler: Arc::new(NoopToolHandler),
311            resource_handler: Arc::new(NoopResourceHandler),
312            prompt_handler: Arc::new(NoopPromptHandler),
313            log_handler: Arc::new(TracingLogHandler),
314            server_info: ServerInfoDef::default(),
315        }
316    }
317
318    /// Set the tool handler
319    pub fn set_tool_handler(&mut self, handler: Arc<dyn ToolHandler>) {
320        self.tool_handler = handler;
321    }
322
323    /// Set the resource handler
324    pub fn set_resource_handler(&mut self, handler: Arc<dyn ResourceHandler>) {
325        self.resource_handler = handler;
326    }
327
328    /// Set the prompt handler
329    pub fn set_prompt_handler(&mut self, handler: Arc<dyn PromptHandler>) {
330        self.prompt_handler = handler;
331    }
332
333    /// Set the log handler
334    pub fn set_log_handler(&mut self, handler: Arc<dyn LogHandler>) {
335        self.log_handler = handler;
336    }
337
338    /// Set server info
339    pub fn set_server_info(&mut self, info: ServerInfoDef) {
340        self.server_info = info;
341    }
342
343    /// Run the server
344    ///
345    /// This runs the Cap'n Proto RPC server on a LocalSet since the RPC system
346    /// uses Rc internally and is not Send.
347    pub async fn run(&self) -> Result<()> {
348        let addr = format!("{}:{}", self.config.listen, self.config.port);
349        tracing::info!("ZAP server listening on {}", addr);
350
351        let listener = TcpListener::bind(&addr).await?;
352
353        // Create shared state for all connections
354        let state = Arc::new(ServerState {
355            tool_handler: self.tool_handler.clone(),
356            resource_handler: self.resource_handler.clone(),
357            prompt_handler: self.prompt_handler.clone(),
358            log_handler: self.log_handler.clone(),
359            server_info: self.server_info.clone(),
360            client_count: AtomicU64::new(0),
361        });
362
363        // Use LocalSet for non-Send RPC futures
364        let local = LocalSet::new();
365
366        local.run_until(async move {
367            loop {
368                tokio::select! {
369                    result = listener.accept() => {
370                        match result {
371                            Ok((stream, addr)) => {
372                                let client_id = state.client_count.fetch_add(1, Ordering::SeqCst);
373                                tracing::debug!("client {} connected from {}", client_id, addr);
374
375                                let state = state.clone();
376                                // Use spawn_local for non-Send futures
377                                tokio::task::spawn_local(async move {
378                                    if let Err(e) = handle_connection(stream, state, client_id).await {
379                                        tracing::error!("client {} error: {}", client_id, e);
380                                    }
381                                    tracing::debug!("client {} disconnected", client_id);
382                                });
383                            }
384                            Err(e) => {
385                                tracing::error!("accept error: {}", e);
386                            }
387                        }
388                    }
389                    _ = tokio::signal::ctrl_c() => {
390                        tracing::info!("shutting down");
391                        break;
392                    }
393                }
394            }
395            Ok::<(), Error>(())
396        }).await?;
397
398        Ok(())
399    }
400
401    /// Run on an existing TCP listener (useful for tests)
402    pub async fn run_on_listener(&self, listener: TcpListener) -> Result<()> {
403        let state = Arc::new(ServerState {
404            tool_handler: self.tool_handler.clone(),
405            resource_handler: self.resource_handler.clone(),
406            prompt_handler: self.prompt_handler.clone(),
407            log_handler: self.log_handler.clone(),
408            server_info: self.server_info.clone(),
409            client_count: AtomicU64::new(0),
410        });
411
412        let local = LocalSet::new();
413
414        local.run_until(async move {
415            loop {
416                tokio::select! {
417                    result = listener.accept() => {
418                        match result {
419                            Ok((stream, addr)) => {
420                                let client_id = state.client_count.fetch_add(1, Ordering::SeqCst);
421                                tracing::debug!("client {} connected from {}", client_id, addr);
422
423                                let state = state.clone();
424                                tokio::task::spawn_local(async move {
425                                    if let Err(e) = handle_connection(stream, state, client_id).await {
426                                        tracing::error!("client {} error: {}", client_id, e);
427                                    }
428                                });
429                            }
430                            Err(e) => {
431                                tracing::error!("accept error: {}", e);
432                                break;
433                            }
434                        }
435                    }
436                    _ = tokio::signal::ctrl_c() => {
437                        break;
438                    }
439                }
440            }
441            Ok::<(), Error>(())
442        }).await?;
443
444        Ok(())
445    }
446}
447
448/// Shared server state
449struct ServerState {
450    tool_handler: Arc<dyn ToolHandler>,
451    resource_handler: Arc<dyn ResourceHandler>,
452    prompt_handler: Arc<dyn PromptHandler>,
453    log_handler: Arc<dyn LogHandler>,
454    server_info: ServerInfoDef,
455    client_count: AtomicU64,
456}
457
458/// Handle a single client connection
459async fn handle_connection(
460    stream: tokio::net::TcpStream,
461    state: Arc<ServerState>,
462    _client_id: u64,
463) -> Result<()> {
464    stream.set_nodelay(true)?;
465
466    // Convert tokio TcpStream to futures-compatible stream
467    let stream = tokio_util::compat::TokioAsyncReadCompatExt::compat(stream);
468    let (reader, writer) = stream.split();
469
470    // Create the Cap'n Proto RPC network
471    let network = twoparty::VatNetwork::new(
472        reader,
473        writer,
474        rpc_twoparty_capnp::Side::Server,
475        Default::default(),
476    );
477
478    // Create the Zap implementation
479    let zap_impl = ZapImpl::new(state);
480    let zap_client: zap::Client = capnp_rpc::new_client(zap_impl);
481
482    // Run the RPC system
483    let rpc_system = RpcSystem::new(Box::new(network), Some(zap_client.client));
484
485    rpc_system.await.map_err(Error::Capnp)
486}
487
488/// Implementation of the Zap interface
489struct ZapImpl {
490    state: Arc<ServerState>,
491}
492
493impl ZapImpl {
494    fn new(state: Arc<ServerState>) -> Self {
495        Self { state }
496    }
497}
498
499impl zap::Server for ZapImpl {
500    /// Initialize connection
501    fn init(
502        &mut self,
503        params: zap::InitParams,
504        mut results: zap::InitResults,
505    ) -> Promise<(), capnp::Error> {
506        let state = self.state.clone();
507
508        Promise::from_future(async move {
509            // Read client info
510            let client = params.get()?.get_client()?;
511            let client_name = client.get_name()?.to_str()?;
512            let client_version = client.get_version()?.to_str()?;
513
514            tracing::info!("client connected: {} v{}", client_name, client_version);
515
516            // Build server info response
517            let mut server = results.get().init_server();
518            server.set_name(&state.server_info.name);
519            server.set_version(&state.server_info.version);
520
521            let mut caps = server.init_capabilities();
522            caps.set_tools(state.server_info.tools);
523            caps.set_resources(state.server_info.resources);
524            caps.set_prompts(state.server_info.prompts);
525            caps.set_logging(state.server_info.logging);
526
527            Ok(())
528        })
529    }
530
531    /// List available tools
532    fn list_tools(
533        &mut self,
534        _params: zap::ListToolsParams,
535        mut results: zap::ListToolsResults,
536    ) -> Promise<(), capnp::Error> {
537        let handler = self.state.tool_handler.clone();
538
539        Promise::from_future(async move {
540            let tools = handler.list().await;
541
542            let tool_list = results.get().init_tools();
543            let mut builder = tool_list.init_tools(tools.len() as u32);
544
545            for (i, t) in tools.iter().enumerate() {
546                let mut tool = builder.reborrow().get(i as u32);
547                tool.set_name(&t.name);
548                tool.set_description(&t.description);
549                tool.set_schema(&t.schema);
550
551                // Set annotations
552                if !t.annotations.is_empty() {
553                    let annotations = tool.init_annotations();
554                    let mut entries = annotations.init_entries(t.annotations.len() as u32);
555                    for (j, (k, v)) in t.annotations.iter().enumerate() {
556                        let mut entry = entries.reborrow().get(j as u32);
557                        entry.set_key(k);
558                        entry.set_value(v);
559                    }
560                }
561            }
562
563            Ok(())
564        })
565    }
566
567    /// Call a tool
568    fn call_tool(
569        &mut self,
570        params: zap::CallToolParams,
571        mut results: zap::CallToolResults,
572    ) -> Promise<(), capnp::Error> {
573        let handler = self.state.tool_handler.clone();
574
575        Promise::from_future(async move {
576            let call = params.get()?.get_call()?;
577            let id = call.get_id()?.to_str()?;
578            let name = call.get_name()?.to_str()?;
579            let args = call.get_args()?;
580
581            // Extract metadata
582            let mut metadata = HashMap::new();
583            if call.has_metadata() {
584                let md = call.get_metadata()?;
585                if md.has_entries() {
586                    for entry in md.get_entries()? {
587                        let key = entry.get_key()?.to_str()?;
588                        let value = entry.get_value()?.to_str()?;
589                        metadata.insert(key.to_string(), value.to_string());
590                    }
591                }
592            }
593
594            // Call the handler
595            let result = handler.call(name, args, metadata).await;
596
597            // Build response
598            let mut tool_result = results.get().init_result();
599            tool_result.set_id(id);
600
601            match result {
602                Ok(content) => {
603                    tool_result.set_content(&content);
604                }
605                Err(e) => {
606                    tool_result.set_error(&e);
607                }
608            }
609
610            Ok(())
611        })
612    }
613
614    /// List available resources
615    fn list_resources(
616        &mut self,
617        _params: zap::ListResourcesParams,
618        mut results: zap::ListResourcesResults,
619    ) -> Promise<(), capnp::Error> {
620        let handler = self.state.resource_handler.clone();
621
622        Promise::from_future(async move {
623            let resources = handler.list().await;
624
625            let resource_list = results.get().init_resources();
626            let mut builder = resource_list.init_resources(resources.len() as u32);
627
628            for (i, r) in resources.iter().enumerate() {
629                let mut resource = builder.reborrow().get(i as u32);
630                resource.set_uri(&r.uri);
631                resource.set_name(&r.name);
632                resource.set_description(&r.description);
633                resource.set_mime_type(&r.mime_type);
634
635                if !r.annotations.is_empty() {
636                    let annotations = resource.init_annotations();
637                    let mut entries = annotations.init_entries(r.annotations.len() as u32);
638                    for (j, (k, v)) in r.annotations.iter().enumerate() {
639                        let mut entry = entries.reborrow().get(j as u32);
640                        entry.set_key(k);
641                        entry.set_value(v);
642                    }
643                }
644            }
645
646            Ok(())
647        })
648    }
649
650    /// Read a resource
651    fn read_resource(
652        &mut self,
653        params: zap::ReadResourceParams,
654        mut results: zap::ReadResourceResults,
655    ) -> Promise<(), capnp::Error> {
656        let handler = self.state.resource_handler.clone();
657
658        Promise::from_future(async move {
659            let uri = params.get()?.get_uri()?.to_str()?;
660
661            let result = handler.read(uri).await;
662
663            let mut content = results.get().init_content();
664
665            match result {
666                Ok(data) => {
667                    content.set_uri(&data.uri);
668                    content.set_mime_type(&data.mime_type);
669
670                    match data.content {
671                        ResourceContentData::Text(text) => {
672                            content.init_content().set_text(&text);
673                        }
674                        ResourceContentData::Blob(blob) => {
675                            content.init_content().set_blob(&blob);
676                        }
677                    }
678                }
679                Err(e) => {
680                    // Set error as text content
681                    content.set_uri(uri);
682                    content.set_mime_type("text/plain");
683                    content.init_content().set_text(&format!("error: {}", e));
684                }
685            }
686
687            Ok(())
688        })
689    }
690
691    /// Subscribe to resource updates
692    fn subscribe(
693        &mut self,
694        params: zap::SubscribeParams,
695        mut results: zap::SubscribeResults,
696    ) -> Promise<(), capnp::Error> {
697        let handler = self.state.resource_handler.clone();
698
699        Promise::from_future(async move {
700            let uri = params.get()?.get_uri()?.to_str()?.to_string();
701
702            let result = handler.subscribe(&uri).await;
703
704            match result {
705                Ok(receiver) => {
706                    let stream_impl = ResourceStreamImpl::new(uri, receiver);
707                    let stream_client: resource_stream::Client =
708                        capnp_rpc::new_client(stream_impl);
709                    results.get().set_stream(stream_client);
710                }
711                Err(_e) => {
712                    // Return an empty stream that immediately completes
713                    let (_, receiver) = tokio::sync::mpsc::channel(1);
714                    let stream_impl = ResourceStreamImpl::new(uri, receiver);
715                    let stream_client: resource_stream::Client =
716                        capnp_rpc::new_client(stream_impl);
717                    results.get().set_stream(stream_client);
718                }
719            }
720
721            Ok(())
722        })
723    }
724
725    /// List available prompts
726    fn list_prompts(
727        &mut self,
728        _params: zap::ListPromptsParams,
729        mut results: zap::ListPromptsResults,
730    ) -> Promise<(), capnp::Error> {
731        let handler = self.state.prompt_handler.clone();
732
733        Promise::from_future(async move {
734            let prompts = handler.list().await;
735
736            let prompt_list = results.get().init_prompts();
737            let mut builder = prompt_list.init_prompts(prompts.len() as u32);
738
739            for (i, p) in prompts.iter().enumerate() {
740                let mut prompt = builder.reborrow().get(i as u32);
741                prompt.set_name(&p.name);
742                prompt.set_description(&p.description);
743
744                let mut args = prompt.init_arguments(p.arguments.len() as u32);
745                for (j, arg) in p.arguments.iter().enumerate() {
746                    let mut a = args.reborrow().get(j as u32);
747                    a.set_name(&arg.name);
748                    a.set_description(&arg.description);
749                    a.set_required(arg.required);
750                }
751            }
752
753            Ok(())
754        })
755    }
756
757    /// Get a prompt
758    fn get_prompt(
759        &mut self,
760        params: zap::GetPromptParams,
761        mut results: zap::GetPromptResults,
762    ) -> Promise<(), capnp::Error> {
763        let handler = self.state.prompt_handler.clone();
764
765        Promise::from_future(async move {
766            let params_reader = params.get()?;
767            let name = params_reader.get_name()?.to_str()?;
768
769            // Extract args
770            let mut args = HashMap::new();
771            if params_reader.has_args() {
772                let md = params_reader.get_args()?;
773                if md.has_entries() {
774                    for entry in md.get_entries()? {
775                        let key = entry.get_key()?.to_str()?;
776                        let value = entry.get_value()?.to_str()?;
777                        args.insert(key.to_string(), value.to_string());
778                    }
779                }
780            }
781
782            let result = handler.get(name, args).await;
783
784            match result {
785                Ok(messages) => {
786                    let mut builder = results.get().init_messages(messages.len() as u32);
787
788                    for (i, msg) in messages.iter().enumerate() {
789                        let mut m = builder.reborrow().get(i as u32);
790
791                        // Set role
792                        match msg.role {
793                            PromptRole::User => m.set_role(prompt_message::Role::User),
794                            PromptRole::Assistant => m.set_role(prompt_message::Role::Assistant),
795                            PromptRole::System => m.set_role(prompt_message::Role::System),
796                        }
797
798                        // Set content
799                        let mut content = m.init_content();
800                        match &msg.content {
801                            PromptContent::Text(text) => {
802                                content.set_text(text);
803                            }
804                            PromptContent::Image { data, mime_type } => {
805                                let mut img = content.init_image();
806                                img.set_data(data);
807                                img.set_mime_type(mime_type);
808                            }
809                            PromptContent::Resource(r) => {
810                                let mut res = content.init_resource();
811                                res.set_uri(&r.uri);
812                                res.set_mime_type(&r.mime_type);
813                                match &r.content {
814                                    ResourceContentData::Text(t) => {
815                                        res.init_content().set_text(t);
816                                    }
817                                    ResourceContentData::Blob(b) => {
818                                        res.init_content().set_blob(b);
819                                    }
820                                }
821                            }
822                        }
823                    }
824                }
825                Err(_e) => {
826                    // Return empty list on error
827                    results.get().init_messages(0);
828                }
829            }
830
831            Ok(())
832        })
833    }
834
835    /// Log a message
836    fn log(
837        &mut self,
838        params: zap::LogParams,
839        _results: zap::LogResults,
840    ) -> Promise<(), capnp::Error> {
841        let handler = self.state.log_handler.clone();
842
843        Promise::from_future(async move {
844            let params_reader = params.get()?;
845            let level = match params_reader.get_level()? {
846                zap::LogLevel::Debug => LogLevel::Debug,
847                zap::LogLevel::Info => LogLevel::Info,
848                zap::LogLevel::Warn => LogLevel::Warn,
849                zap::LogLevel::Error => LogLevel::Error,
850            };
851            let message = params_reader.get_message()?.to_str()?;
852            let data = params_reader.get_data()?;
853
854            handler.log(level, message, data);
855
856            Ok(())
857        })
858    }
859}
860
861/// Implementation of ResourceStream interface
862struct ResourceStreamImpl {
863    uri: String,
864    receiver: std::cell::RefCell<tokio::sync::mpsc::Receiver<ResourceContentDef>>,
865    done: std::cell::Cell<bool>,
866}
867
868impl ResourceStreamImpl {
869    fn new(uri: String, receiver: tokio::sync::mpsc::Receiver<ResourceContentDef>) -> Self {
870        Self {
871            uri,
872            receiver: std::cell::RefCell::new(receiver),
873            done: std::cell::Cell::new(false),
874        }
875    }
876}
877
878impl resource_stream::Server for ResourceStreamImpl {
879    fn next(
880        &mut self,
881        _params: resource_stream::NextParams,
882        mut results: resource_stream::NextResults,
883    ) -> Promise<(), capnp::Error> {
884        if self.done.get() {
885            return Promise::from_future(async move {
886                results.get().set_done(true);
887                Ok(())
888            });
889        }
890
891        // Since we're running on LocalSet, we don't need Send
892        let receiver = &self.receiver;
893        let done_flag = &self.done;
894
895        // Use a simple approach: try to receive now
896        let mut guard = receiver.borrow_mut();
897        match guard.try_recv() {
898            Ok(data) => {
899                drop(guard);
900                Promise::from_future(async move {
901                    let mut content = results.get().init_content();
902                    content.set_uri(&data.uri);
903                    content.set_mime_type(&data.mime_type);
904
905                    match data.content {
906                        ResourceContentData::Text(text) => {
907                            content.init_content().set_text(&text);
908                        }
909                        ResourceContentData::Blob(blob) => {
910                            content.init_content().set_blob(&blob);
911                        }
912                    }
913
914                    results.get().set_done(false);
915                    Ok(())
916                })
917            }
918            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
919                // Channel is empty but not closed - return not done
920                drop(guard);
921                Promise::from_future(async move {
922                    results.get().set_done(false);
923                    Ok(())
924                })
925            }
926            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
927                done_flag.set(true);
928                drop(guard);
929                Promise::from_future(async move {
930                    results.get().set_done(true);
931                    Ok(())
932                })
933            }
934        }
935    }
936
937    fn cancel(
938        &mut self,
939        _params: resource_stream::CancelParams,
940        _results: resource_stream::CancelResults,
941    ) -> Promise<(), capnp::Error> {
942        self.done.set(true);
943        Promise::ok(())
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950
951    #[test]
952    fn test_server_new() {
953        let config = Config::default();
954        let server = Server::new(config);
955        assert_eq!(server.server_info.name, "zap");
956    }
957
958    #[test]
959    fn test_tool_def() {
960        let tool = ToolDef {
961            name: "test".into(),
962            description: "A test tool".into(),
963            schema: b"{}".to_vec(),
964            annotations: HashMap::new(),
965        };
966        assert_eq!(tool.name, "test");
967    }
968
969    #[test]
970    fn test_resource_content_data() {
971        let text = ResourceContentData::Text("hello".into());
972        assert!(matches!(text, ResourceContentData::Text(_)));
973
974        let blob = ResourceContentData::Blob(vec![1, 2, 3]);
975        assert!(matches!(blob, ResourceContentData::Blob(_)));
976    }
977
978    #[test]
979    fn test_log_levels() {
980        assert_ne!(LogLevel::Debug, LogLevel::Error);
981        assert_eq!(LogLevel::Info, LogLevel::Info);
982    }
983
984    #[test]
985    fn test_server_info_default() {
986        let info = ServerInfoDef::default();
987        assert_eq!(info.name, "zap");
988        assert!(info.tools);
989        assert!(info.resources);
990        assert!(info.prompts);
991        assert!(info.logging);
992    }
993}