Skip to main content

a2ui_base/
message_processor.rs

1//! Message processor — parses A2UI messages and mutates state models.
2
3use std::collections::HashMap;
4
5use crate::catalog::Catalog;
6use crate::error::{A2uiError, Result};
7use crate::model::data_model::DataModel;
8use crate::model::surface_model::SurfaceModel;
9use crate::model::surface_group_model::SurfaceGroupModel;
10use crate::protocol::client_to_server::{
11    ClientMessage, ClientPayload, ErrorData, ErrorPayload, FunctionResponseData,
12    FunctionResponsePayload,
13};
14use crate::protocol::server_to_client::{
15    A2uiMessage, A2uiPayload, CallFunctionPayload, CreateSurfaceData, DeleteSurfaceData,
16    UpdateComponentsData, UpdateDataModelData,
17};
18
19/// Parses A2UI JSON messages and applies them to the state models.
20pub struct MessageProcessor {
21    /// The state model (all active surfaces).
22    pub model: SurfaceGroupModel,
23    /// Registered catalogs keyed by catalog ID.
24    #[allow(dead_code)]
25    catalogs: HashMap<String, Catalog>,
26    /// Outgoing client-to-server messages produced during processing.
27    outgoing_messages: Vec<ClientMessage>,
28}
29
30impl MessageProcessor {
31    /// Create a new processor with the given catalogs.
32    pub fn new(catalogs: Vec<Catalog>) -> Self {
33        let catalog_map: HashMap<String, Catalog> = catalogs
34            .into_iter()
35            .map(|c| (c.id.clone(), c))
36            .collect();
37        Self {
38            model: SurfaceGroupModel::new(),
39            catalogs: catalog_map,
40            outgoing_messages: Vec::new(),
41        }
42    }
43
44    /// Reset all processed state (surfaces and outgoing messages) while
45    /// keeping the registered catalogs intact.
46    ///
47    /// Use this to replay a sample from scratch instead of constructing a new
48    /// processor with `MessageProcessor::new(vec![])`, which would silently
49    /// drop the catalogs and cause every component type to be flagged as
50    /// "unknown".
51    pub fn reset(&mut self) {
52        self.model = SurfaceGroupModel::new();
53        self.outgoing_messages.clear();
54    }
55
56    /// Parse a raw JSON string into an A2uiMessage.
57    pub fn parse_message(json: &str) -> Result<A2uiMessage> {
58        let msg: A2uiMessage = serde_json::from_str(json)?;
59        Ok(msg)
60    }
61
62    /// Parse a JSONL stream (newline-delimited JSON objects).
63    pub fn parse_jsonl(jsonl: &str) -> Vec<Result<A2uiMessage>> {
64        jsonl.lines()
65            .filter(|line| !line.trim().is_empty())
66            .map(|line| Self::parse_message(line))
67            .collect()
68    }
69
70    /// Process a single parsed message.
71    pub fn process_message(&mut self, msg: A2uiMessage) -> Result<()> {
72        match &msg.payload {
73            A2uiPayload::CreateSurface(payload) => {
74                self.handle_create_surface(&payload.create_surface)
75            }
76            A2uiPayload::UpdateComponents(payload) => {
77                self.handle_update_components(&payload.update_components)
78            }
79            A2uiPayload::UpdateDataModel(payload) => {
80                self.handle_update_data_model(&payload.update_data_model)
81            }
82            A2uiPayload::DeleteSurface(payload) => {
83                self.handle_delete_surface(&payload.delete_surface)
84            }
85            A2uiPayload::CallFunction(payload) => {
86                self.handle_call_function(payload)
87            }
88            A2uiPayload::ActionResponse(payload) => {
89                self.handle_action_response(payload)
90            }
91        }
92    }
93
94    /// Process multiple messages sequentially.
95    pub fn process_messages(&mut self, messages: Vec<A2uiMessage>) -> Vec<Result<()>> {
96        messages.into_iter().map(|m| self.process_message(m)).collect()
97    }
98
99    /// Drain outgoing client-to-server messages produced during processing.
100    ///
101    /// Call this after `process_message` / `process_messages` to retrieve
102    /// any `functionResponse`, `error`, or other client messages that should
103    /// be sent back to the server.
104    pub fn drain_outgoing(&mut self) -> Vec<ClientMessage> {
105        std::mem::take(&mut self.outgoing_messages)
106    }
107
108    /// Check if a component type exists in any registered catalog.
109    pub fn catalog_type_exists(&self, component_type: &str) -> bool {
110        self.catalogs.values()
111            .any(|cat| cat.components.contains_key(component_type))
112    }
113
114    /// Return the IDs of all registered catalogs (native + inline).
115    pub fn registered_catalog_ids(&self) -> Vec<String> {
116        self.catalogs.keys().cloned().collect()
117    }
118
119    /// Register an inline catalog from a raw JSON value.
120    ///
121    /// The catalog is parsed via [`capabilities::parse_inline_catalog`]. Each
122    /// declared function becomes a [`SchemaOnlyFunction`] in a fresh
123    /// [`Catalog`] (so `handle_call_function` can discover and reject
124    /// execution attempts). Declared components have no native renderer and
125    /// are *not* added to `catalog.components` — at render time they fall
126    /// back to the generic renderer.
127    pub fn register_inline_catalog(&mut self, json: serde_json::Value) -> Result<()> {
128        let parsed = crate::capabilities::parse_inline_catalog(&json)?;
129
130        let mut catalog = Catalog::new(parsed.catalog_id.clone());
131        for func in &parsed.functions {
132            let return_type = crate::catalog::schema_only::parse_return_type(&func.return_type);
133            let schema_func = crate::catalog::schema_only::SchemaOnlyFunction::new(
134                func.name.clone(),
135                return_type,
136            );
137            catalog = catalog.with_function(Box::new(schema_func));
138        }
139
140        self.catalogs.insert(parsed.catalog_id, catalog);
141        Ok(())
142    }
143
144    /// Register a pending action that expects a server response.
145    ///
146    /// Call this when the caller sends an `action` message with
147    /// `wantResponse: true`. The `response_path` (if any) tells the
148    /// processor where to store the server's response value in the data model.
149    pub fn register_action(
150        &mut self,
151        surface_id: &str,
152        action_id: &str,
153        response_path: Option<String>,
154    ) -> Result<()> {
155        let surface = self
156            .model
157            .get_surface_mut(surface_id)
158            .ok_or_else(|| A2uiError::SurfaceNotFound(surface_id.to_string()))?;
159        surface
160            .pending_actions
161            .borrow_mut()
162            .insert(
163                action_id.to_string(),
164                crate::model::surface_model::PendingAction {
165                    action_id: action_id.to_string(),
166                    response_path,
167                },
168            );
169        Ok(())
170    }
171
172    /// Load a sample file (wrapping messages in {name, description, messages}).
173    pub fn load_sample(json: &str) -> Result<(String, String, Vec<A2uiMessage>)> {
174        let sample: serde_json::Value = serde_json::from_str(json)?;
175
176        let name = sample["name"].as_str().unwrap_or("unnamed").to_string();
177        let description = sample["description"].as_str().unwrap_or("").to_string();
178
179        let messages_val = sample["messages"].as_array().ok_or_else(|| {
180            A2uiError::Validation("sample missing 'messages' array".into())
181        })?;
182
183        let messages: Vec<A2uiMessage> = messages_val
184            .iter()
185            .filter_map(|v| serde_json::from_value(v.clone()).ok())
186            .collect();
187
188        Ok((name, description, messages))
189    }
190
191    // -----------------------------------------------------------------------
192    // Message handlers
193    // -----------------------------------------------------------------------
194
195    fn handle_create_surface(&mut self, data: &CreateSurfaceData) -> Result<()> {
196        // Validate surface doesn't already exist
197        if self.model.get_surface(&data.surface_id).is_some() {
198            return Err(A2uiError::SurfaceExists(data.surface_id.clone()));
199        }
200
201        let mut surface = SurfaceModel::new(
202            data.surface_id.clone(),
203            data.catalog_id.clone(),
204            data.surface_properties.clone(),
205            data.send_data_model,
206        );
207
208        // Initialize data model if provided
209        if let Some(dm) = &data.data_model {
210            surface = surface.with_data_model(dm.clone());
211        }
212
213        // Parse and add components if provided
214        if let Some(components) = &data.components {
215            surface.components.borrow_mut().add_from_json(components);
216        }
217
218        self.model.add_surface(surface)
219    }
220
221    fn handle_update_components(&mut self, data: &UpdateComponentsData) -> Result<()> {
222        // Graceful degradation: unknown component types are still added below
223        // via add_from_json. We intentionally do NOT eprintln diagnostics here
224        // — this is a library, and writing to stderr corrupts TUI consumers
225        // (e.g. the gallery app renders into stderr).
226        let surface = self.model.get_surface_mut(&data.surface_id)
227            .ok_or_else(|| A2uiError::SurfaceNotFound(data.surface_id.clone()))?;
228
229        surface.components.borrow_mut().add_from_json(&data.components);
230        Ok(())
231    }
232
233    fn handle_update_data_model(&mut self, data: &UpdateDataModelData) -> Result<()> {
234        let surface = self.model.get_surface_mut(&data.surface_id)
235            .ok_or_else(|| A2uiError::SurfaceNotFound(data.surface_id.clone()))?;
236
237        let path = data.path.as_deref().unwrap_or("/");
238        let value = data.value.clone().unwrap_or(serde_json::Value::Null);
239
240        if path == "/" || path.is_empty() {
241            if value.is_null() {
242                surface.data_model.borrow_mut().replace_all(serde_json::json!({}));
243            } else {
244                surface.data_model.borrow_mut().replace_all(value);
245            }
246        } else {
247            surface.data_model.borrow_mut().set(path, value);
248        }
249        Ok(())
250    }
251
252    fn handle_delete_surface(&mut self, data: &DeleteSurfaceData) -> Result<()> {
253        self.model.delete_surface(&data.surface_id)
254    }
255
256    fn handle_call_function(&mut self, payload: &CallFunctionPayload) -> Result<()> {
257        let fc = &payload.call_function;
258        let call_id = &payload.function_call_id;
259
260        // 1. Find the function across all catalogs
261        let mut found_func: Option<&dyn crate::catalog::function_api::FunctionImplementation> = None;
262        let mut found_functions_map: Option<&std::collections::HashMap<String, Box<dyn crate::catalog::function_api::FunctionImplementation>>> = None;
263
264        for catalog in self.catalogs.values() {
265            if let Some(f) = catalog.get_function(&fc.call) {
266                found_func = Some(f);
267                found_functions_map = Some(&catalog.functions);
268                break;
269            }
270        }
271
272        // 2. Function not found → reject with error
273        let Some(func) = found_func else {
274            self.queue_outgoing(ClientMessage {
275                version: "v1.0".to_string(),
276                payload: ClientPayload::Error(ErrorPayload {
277                    error: ErrorData {
278                        code: "INVALID_FUNCTION_CALL".to_string(),
279                        message: format!("function not found: {}", fc.call),
280                        surface_id: None,
281                        function_call_id: Some(call_id.clone()),
282                    },
283                }),
284            });
285            return Ok(());
286        };
287
288        // 3. Build a DataContext using the first available surface's DataModel.
289        //    We execute and collect results in a block so the borrows are dropped
290        //    before we call queue_outgoing (which needs &mut self).
291        let execution_result: std::result::Result<serde_json::Value, A2uiError> = {
292            let empty_dm;
293            let data_model: &DataModel = match self.model.surfaces().next() {
294                Some(surface) => &surface.data_model.borrow(),
295                None => {
296                    empty_dm = DataModel::new();
297                    &empty_dm
298                }
299            };
300            let functions_map = found_functions_map.unwrap();
301            let ctx = crate::model::data_context::DataContext::new(data_model, functions_map);
302
303            // 4. Resolve args (may contain path bindings or nested function calls)
304            let mut resolved_args = HashMap::new();
305            for (key, val) in &fc.args {
306                let resolved = ctx.resolve_dynamic_value(
307                    &serde_json::from_value::<crate::protocol::common_types::DynamicValue>(val.clone())
308                        .unwrap_or(crate::protocol::common_types::DynamicValue::String(val.to_string())),
309                );
310                resolved_args.insert(key.clone(), resolved);
311            }
312
313            // 5. Execute the function
314            func.execute(&resolved_args, &ctx)
315        };
316        // borrows on self.model and self.catalogs are released here
317
318        // 6. Queue outgoing messages based on result
319        match execution_result {
320            Ok(result) => {
321                if payload.want_response {
322                    self.queue_outgoing(ClientMessage {
323                        version: "v1.0".to_string(),
324                        payload: ClientPayload::FunctionResponse(FunctionResponsePayload {
325                            function_response: FunctionResponseData {
326                                function_call_id: call_id.clone(),
327                                call: fc.call.clone(),
328                                value: result,
329                            },
330                        }),
331                    });
332                }
333            }
334            Err(e) => {
335                self.queue_outgoing(ClientMessage {
336                    version: "v1.0".to_string(),
337                    payload: ClientPayload::Error(ErrorPayload {
338                        error: ErrorData {
339                            code: "INVALID_FUNCTION_CALL".to_string(),
340                            message: e.to_string(),
341                            surface_id: None,
342                            function_call_id: Some(call_id.clone()),
343                        },
344                    }),
345                });
346            }
347        }
348
349        Ok(())
350    }
351
352    fn handle_action_response(
353        &mut self,
354        payload: &crate::protocol::server_to_client::ActionResponsePayload,
355    ) -> Result<()> {
356        let action_id = &payload.action_id;
357
358        // Search all surfaces for the pending action
359        for surface in self.model.surfaces_mut() {
360            let pending = surface.pending_actions.borrow_mut().remove(action_id);
361            if let Some(pa) = pending {
362                if let Some(ref path) = pa.response_path {
363                    if let Some(ref value) = payload.action_response.value {
364                        surface.data_model.borrow_mut().set(path, value.clone());
365                    }
366                }
367                return Ok(());
368            }
369        }
370
371        // No pending action found for this action_id — silently ignore
372        // (the action may not have had wantResponse, or was already handled)
373        Ok(())
374    }
375
376    /// Queue an outgoing client-to-server message.
377    fn queue_outgoing(&mut self, msg: ClientMessage) {
378        self.outgoing_messages.push(msg);
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    fn make_processor() -> MessageProcessor {
387        MessageProcessor::new(vec![])
388    }
389
390    #[test]
391    fn test_create_and_delete_surface() {
392        let mut proc = make_processor();
393
394        let msg = serde_json::json!({
395            "version": "v1.0",
396            "createSurface": {
397                "surfaceId": "test_1",
398                "catalogId": "https://example.com/catalog.json"
399            }
400        });
401        let parsed = MessageProcessor::parse_message(&msg.to_string()).unwrap();
402        proc.process_message(parsed).unwrap();
403
404        assert!(proc.model.get_surface("test_1").is_some());
405
406        let del = serde_json::json!({
407            "version": "v1.0",
408            "deleteSurface": {
409                "surfaceId": "test_1"
410            }
411        });
412        let parsed = MessageProcessor::parse_message(&del.to_string()).unwrap();
413        proc.process_message(parsed).unwrap();
414
415        assert!(proc.model.get_surface("test_1").is_none());
416    }
417
418    #[test]
419    fn test_update_components() {
420        let mut proc = make_processor();
421
422        // Create surface
423        let create = serde_json::json!({
424            "version": "v1.0",
425            "createSurface": {
426                "surfaceId": "s1",
427                "catalogId": "test"
428            }
429        });
430        proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
431
432        // Update components
433        let update = serde_json::json!({
434            "version": "v1.0",
435            "updateComponents": {
436                "surfaceId": "s1",
437                "components": [
438                    {"id": "root", "component": "Column", "children": ["hello"]},
439                    {"id": "hello", "component": "Text", "text": "Hello World"}
440                ]
441            }
442        });
443        proc.process_message(MessageProcessor::parse_message(&update.to_string()).unwrap()).unwrap();
444
445        let surface = proc.model.get_surface("s1").unwrap();
446        let components = surface.components.borrow();
447        assert!(components.contains("root"));
448        assert!(components.contains("hello"));
449        assert_eq!(components.get("hello").unwrap().component_type, "Text");
450    }
451
452    #[test]
453    fn test_update_data_model() {
454        let mut proc = make_processor();
455
456        let create = serde_json::json!({
457            "version": "v1.0",
458            "createSurface": {
459                "surfaceId": "s1",
460                "catalogId": "test",
461                "dataModel": {"name": "Alice"}
462            }
463        });
464        proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
465
466        // Update a field
467        let update = serde_json::json!({
468            "version": "v1.0",
469            "updateDataModel": {
470                "surfaceId": "s1",
471                "path": "/name",
472                "value": "Bob"
473            }
474        });
475        proc.process_message(MessageProcessor::parse_message(&update.to_string()).unwrap()).unwrap();
476
477        let surface = proc.model.get_surface("s1").unwrap();
478        assert_eq!(
479            surface.data_model.borrow().get("/name"),
480            Some(&serde_json::json!("Bob"))
481        );
482    }
483
484    #[test]
485    fn test_duplicate_surface_error() {
486        let mut proc = make_processor();
487
488        let create = serde_json::json!({
489            "version": "v1.0",
490            "createSurface": {
491                "surfaceId": "dup",
492                "catalogId": "test"
493            }
494        });
495        proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
496
497        let result = proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap());
498        assert!(result.is_err());
499    }
500
501    #[test]
502    fn test_parse_jsonl() {
503        let jsonl = r#"
504{"version":"v1.0","createSurface":{"surfaceId":"main","catalogId":"test"}}
505{"version":"v1.0","updateComponents":{"surfaceId":"main","components":[{"id":"root","component":"Text","text":"Hi"}]}}
506"#;
507        let messages = MessageProcessor::parse_jsonl(jsonl);
508        assert_eq!(messages.len(), 2);
509        assert!(messages[0].is_ok());
510        assert!(messages[1].is_ok());
511    }
512
513    #[test]
514    fn test_spec_simple_text_sample() {
515        let sample = r#"{
516            "name": "Simple Text",
517            "description": "Basic text rendering",
518            "messages": [
519                {
520                    "version": "v1.0",
521                    "createSurface": {
522                        "surfaceId": "example_1",
523                        "catalogId": "https://a2ui.org/specification/v1_0/catalogs/minimal/catalog.json"
524                    }
525                },
526                {
527                    "version": "v1.0",
528                    "updateComponents": {
529                        "surfaceId": "example_1",
530                        "components": [
531                            {"id": "root", "component": "Text", "text": "Hello, Minimal Catalog!", "variant": "h1"}
532                        ]
533                    }
534                }
535            ]
536        }"#;
537
538        let (name, desc, messages) = MessageProcessor::load_sample(sample).unwrap();
539        assert_eq!(name, "Simple Text");
540        assert_eq!(messages.len(), 2);
541
542        let mut proc = make_processor();
543        let results = proc.process_messages(messages);
544        assert!(results.iter().all(|r| r.is_ok()));
545
546        let surface = proc.model.get_surface("example_1").unwrap();
547        let components = surface.components.borrow();
548        let root = components.get("root").unwrap();
549        assert_eq!(root.component_type, "Text");
550        assert_eq!(root.get_raw("text").unwrap(), "Hello, Minimal Catalog!");
551        assert_eq!(root.get_raw("variant").unwrap(), "h1");
552    }
553
554    #[test]
555    fn test_spec_login_form_sample() {
556        let sample = r#"{
557            "name": "Login Form",
558            "description": "Form with input fields and action",
559            "messages": [
560                {
561                    "version": "v1.0",
562                    "createSurface": {
563                        "surfaceId": "example_4",
564                        "catalogId": "https://a2ui.org/specification/v1_0/catalogs/minimal/catalog.json",
565                        "sendDataModel": true
566                    }
567                },
568                {
569                    "version": "v1.0",
570                    "updateComponents": {
571                        "surfaceId": "example_4",
572                        "components": [
573                            {"id": "root", "component": "Column", "children": ["form_title", "username_field", "password_field", "submit_button"], "justify": "start", "align": "stretch"},
574                            {"id": "form_title", "component": "Text", "text": "Login", "variant": "h2"},
575                            {"id": "username_field", "component": "TextField", "label": "Username", "value": {"path": "/username"}, "variant": "shortText"},
576                            {"id": "password_field", "component": "TextField", "label": "Password", "value": {"path": "/password"}, "variant": "obscured"},
577                            {"id": "submit_button", "component": "Button", "child": "submit_label", "variant": "primary", "action": {"event": {"name": "login_submitted", "context": {"user": {"path": "/username"}, "pass": {"path": "/password"}}}}},
578                            {"id": "submit_label", "component": "Text", "text": "Sign In"}
579                        ]
580                    }
581                }
582            ]
583        }"#;
584
585        let (_name, _desc, messages) = MessageProcessor::load_sample(sample).unwrap();
586        assert_eq!(messages.len(), 2);
587
588        let mut proc = make_processor();
589        let results = proc.process_messages(messages);
590        assert!(results.iter().all(|r| r.is_ok()));
591
592        let surface = proc.model.get_surface("example_4").unwrap();
593        assert!(surface.send_data_model);
594
595        let components = surface.components.borrow();
596        assert_eq!(components.len(), 6);
597
598        let root = components.get("root").unwrap();
599        assert_eq!(root.component_type, "Column");
600        let children = root.children().unwrap();
601        match children {
602            crate::protocol::common_types::ChildList::Static(ids) => {
603                assert_eq!(ids.len(), 4);
604                assert_eq!(ids[0], "form_title");
605            }
606            _ => panic!("expected static children"),
607        }
608
609        let submit = components.get("submit_button").unwrap();
610        assert_eq!(submit.component_type, "Button");
611        assert!(submit.action().is_some());
612    }
613
614}