1use std::collections::HashMap;
4
5use crate::catalog::Catalog;
6use crate::error::{A2uiError, Result};
7use crate::model::data_model::DataModel;
8use crate::model::surface_model::SurfaceModel;
9use crate::model::surface_group_model::SurfaceGroupModel;
10use crate::protocol::client_to_server::{
11 ClientMessage, ClientPayload, ErrorData, ErrorPayload, FunctionResponseData,
12 FunctionResponsePayload,
13};
14use crate::protocol::server_to_client::{
15 A2uiMessage, A2uiPayload, CallFunctionPayload, CreateSurfaceData, DeleteSurfaceData,
16 UpdateComponentsData, UpdateDataModelData,
17};
18
19pub struct MessageProcessor {
21 pub model: SurfaceGroupModel,
23 #[allow(dead_code)]
25 catalogs: HashMap<String, Catalog>,
26 outgoing_messages: Vec<ClientMessage>,
28 validation: Option<crate::validate::ValidationConfig>,
32 pending_validation: crate::validate::ValidationReport,
35}
36
37impl MessageProcessor {
38 pub fn new(catalogs: Vec<Catalog>) -> Self {
40 let catalog_map: HashMap<String, Catalog> = catalogs
41 .into_iter()
42 .map(|c| (c.id.clone(), c))
43 .collect();
44 Self {
45 model: SurfaceGroupModel::new(),
46 catalogs: catalog_map,
47 outgoing_messages: Vec::new(),
48 validation: None,
49 pending_validation: crate::validate::ValidationReport::new(),
50 }
51 }
52
53 pub fn reset(&mut self) {
61 self.model = SurfaceGroupModel::new();
62 self.outgoing_messages.clear();
63 self.pending_validation = crate::validate::ValidationReport::new();
64 }
65
66 pub fn parse_message(json: &str) -> Result<A2uiMessage> {
68 let msg: A2uiMessage = serde_json::from_str(json)?;
69 Ok(msg)
70 }
71
72 pub fn parse_jsonl(jsonl: &str) -> Vec<Result<A2uiMessage>> {
74 jsonl.lines()
75 .filter(|line| !line.trim().is_empty())
76 .map(|line| Self::parse_message(line))
77 .collect()
78 }
79
80 pub fn process_message(&mut self, msg: A2uiMessage) -> Result<()> {
82 match &msg.payload {
83 A2uiPayload::CreateSurface(payload) => {
84 self.handle_create_surface(&payload.create_surface)
85 }
86 A2uiPayload::UpdateComponents(payload) => {
87 self.handle_update_components(&payload.update_components)
88 }
89 A2uiPayload::UpdateDataModel(payload) => {
90 self.handle_update_data_model(&payload.update_data_model)
91 }
92 A2uiPayload::DeleteSurface(payload) => {
93 self.handle_delete_surface(&payload.delete_surface)
94 }
95 A2uiPayload::CallFunction(payload) => {
96 self.handle_call_function(payload)
97 }
98 A2uiPayload::ActionResponse(payload) => {
99 self.handle_action_response(payload)
100 }
101 }
102 }
103
104 pub fn process_messages(&mut self, messages: Vec<A2uiMessage>) -> Vec<Result<()>> {
106 messages.into_iter().map(|m| self.process_message(m)).collect()
107 }
108
109 pub fn drain_outgoing(&mut self) -> Vec<ClientMessage> {
115 std::mem::take(&mut self.outgoing_messages)
116 }
117
118 pub fn with_validation(mut self, cfg: crate::validate::ValidationConfig) -> Self {
124 self.validation = Some(cfg);
125 self
126 }
127
128 pub fn drain_validation(&mut self) -> crate::validate::ValidationReport {
132 std::mem::take(&mut self.pending_validation)
133 }
134
135 pub fn catalog_type_exists(&self, component_type: &str) -> bool {
137 self.catalogs.values()
138 .any(|cat| cat.components.contains_key(component_type))
139 }
140
141 pub fn registered_catalog_ids(&self) -> Vec<String> {
143 self.catalogs.keys().cloned().collect()
144 }
145
146 pub fn register_inline_catalog(&mut self, json: serde_json::Value) -> Result<()> {
155 let parsed = crate::capabilities::parse_inline_catalog(&json)?;
156
157 let mut catalog = Catalog::new(parsed.catalog_id.clone());
158 for func in &parsed.functions {
159 let return_type = crate::catalog::schema_only::parse_return_type(&func.return_type);
160 let schema_func = crate::catalog::schema_only::SchemaOnlyFunction::new(
161 func.name.clone(),
162 return_type,
163 );
164 catalog = catalog.with_function(Box::new(schema_func));
165 }
166
167 self.catalogs.insert(parsed.catalog_id, catalog);
168 Ok(())
169 }
170
171 pub fn register_action(
177 &mut self,
178 surface_id: &str,
179 action_id: &str,
180 response_path: Option<String>,
181 ) -> Result<()> {
182 let surface = self
183 .model
184 .get_surface_mut(surface_id)
185 .ok_or_else(|| A2uiError::SurfaceNotFound(surface_id.to_string()))?;
186 surface
187 .pending_actions
188 .borrow_mut()
189 .insert(
190 action_id.to_string(),
191 crate::model::surface_model::PendingAction {
192 action_id: action_id.to_string(),
193 response_path,
194 },
195 );
196 Ok(())
197 }
198
199 pub fn load_sample(json: &str) -> Result<(String, String, Vec<A2uiMessage>)> {
201 let sample: serde_json::Value = serde_json::from_str(json)?;
202
203 let name = sample["name"].as_str().unwrap_or("unnamed").to_string();
204 let description = sample["description"].as_str().unwrap_or("").to_string();
205
206 let messages_val = sample["messages"].as_array().ok_or_else(|| {
207 A2uiError::Validation("sample missing 'messages' array".into())
208 })?;
209
210 let messages: Vec<A2uiMessage> = messages_val
211 .iter()
212 .filter_map(|v| serde_json::from_value(v.clone()).ok())
213 .collect();
214
215 Ok((name, description, messages))
216 }
217
218 fn handle_create_surface(&mut self, data: &CreateSurfaceData) -> Result<()> {
223 if self.model.get_surface(&data.surface_id).is_some() {
225 return Err(A2uiError::SurfaceExists(data.surface_id.clone()));
226 }
227
228 let mut surface = SurfaceModel::new(
229 data.surface_id.clone(),
230 data.catalog_id.clone(),
231 data.surface_properties.clone(),
232 data.send_data_model,
233 );
234
235 if let Some(dm) = &data.data_model {
237 surface = surface.with_data_model(dm.clone());
238 }
239
240 if let Some(components) = &data.components {
242 surface.components.borrow_mut().add_from_json(components);
243 }
244
245 if let Some(cfg) = self.validation {
249 if let Some(components) = &data.components {
250 self.run_payload_validation(components, cfg);
251 }
252 }
253
254 self.model.add_surface(surface)
255 }
256
257 fn handle_update_components(&mut self, data: &UpdateComponentsData) -> Result<()> {
258 let surface = self.model.get_surface_mut(&data.surface_id)
263 .ok_or_else(|| A2uiError::SurfaceNotFound(data.surface_id.clone()))?;
264
265 surface.components.borrow_mut().add_from_json(&data.components);
266
267 if let Some(cfg) = self.validation {
269 self.run_payload_validation(&data.components, cfg);
270 }
271
272 Ok(())
273 }
274
275 fn handle_update_data_model(&mut self, data: &UpdateDataModelData) -> Result<()> {
276 let surface = self.model.get_surface_mut(&data.surface_id)
277 .ok_or_else(|| A2uiError::SurfaceNotFound(data.surface_id.clone()))?;
278
279 let path = data.path.as_deref().unwrap_or("/");
280 let value = data.value.clone().unwrap_or(serde_json::Value::Null);
281
282 if path == "/" || path.is_empty() {
283 if value.is_null() {
284 surface.data_model.borrow_mut().replace_all(serde_json::json!({}));
285 } else {
286 surface.data_model.borrow_mut().replace_all(value);
287 }
288 } else {
289 surface.data_model.borrow_mut().set(path, value);
290 }
291 Ok(())
292 }
293
294 fn handle_delete_surface(&mut self, data: &DeleteSurfaceData) -> Result<()> {
295 self.model.delete_surface(&data.surface_id)
296 }
297
298 fn handle_call_function(&mut self, payload: &CallFunctionPayload) -> Result<()> {
299 let fc = &payload.call_function;
300 let call_id = &payload.function_call_id;
301
302 let mut found_func: Option<&dyn crate::catalog::function_api::FunctionImplementation> = None;
304 let mut found_functions_map: Option<&std::collections::HashMap<String, Box<dyn crate::catalog::function_api::FunctionImplementation>>> = None;
305
306 for catalog in self.catalogs.values() {
307 if let Some(f) = catalog.get_function(&fc.call) {
308 found_func = Some(f);
309 found_functions_map = Some(&catalog.functions);
310 break;
311 }
312 }
313
314 let Some(func) = found_func else {
316 self.queue_outgoing(ClientMessage {
317 version: "v1.0".to_string(),
318 payload: ClientPayload::Error(ErrorPayload {
319 error: ErrorData {
320 code: "INVALID_FUNCTION_CALL".to_string(),
321 message: format!("function not found: {}", fc.call),
322 surface_id: None,
323 function_call_id: Some(call_id.clone()),
324 },
325 }),
326 });
327 return Ok(());
328 };
329
330 let execution_result: std::result::Result<serde_json::Value, A2uiError> = {
334 let empty_dm;
335 let data_model: &DataModel = match self.model.surfaces().next() {
336 Some(surface) => &surface.data_model.borrow(),
337 None => {
338 empty_dm = DataModel::new();
339 &empty_dm
340 }
341 };
342 let functions_map = found_functions_map.unwrap();
343 let ctx = crate::model::data_context::DataContext::new(data_model, functions_map);
344
345 let mut resolved_args = HashMap::new();
347 for (key, val) in &fc.args {
348 let resolved = ctx.resolve_dynamic_value(
349 &serde_json::from_value::<crate::protocol::common_types::DynamicValue>(val.clone())
350 .unwrap_or(crate::protocol::common_types::DynamicValue::String(val.to_string())),
351 );
352 resolved_args.insert(key.clone(), resolved);
353 }
354
355 func.execute(&resolved_args, &ctx)
357 };
358 match execution_result {
362 Ok(result) => {
363 if payload.want_response {
364 self.queue_outgoing(ClientMessage {
365 version: "v1.0".to_string(),
366 payload: ClientPayload::FunctionResponse(FunctionResponsePayload {
367 function_response: FunctionResponseData {
368 function_call_id: call_id.clone(),
369 call: fc.call.clone(),
370 value: result,
371 },
372 }),
373 });
374 }
375 }
376 Err(e) => {
377 self.queue_outgoing(ClientMessage {
378 version: "v1.0".to_string(),
379 payload: ClientPayload::Error(ErrorPayload {
380 error: ErrorData {
381 code: "INVALID_FUNCTION_CALL".to_string(),
382 message: e.to_string(),
383 surface_id: None,
384 function_call_id: Some(call_id.clone()),
385 },
386 }),
387 });
388 }
389 }
390
391 Ok(())
392 }
393
394 fn handle_action_response(
395 &mut self,
396 payload: &crate::protocol::server_to_client::ActionResponsePayload,
397 ) -> Result<()> {
398 let action_id = &payload.action_id;
399
400 for surface in self.model.surfaces_mut() {
402 let pending = surface.pending_actions.borrow_mut().remove(action_id);
403 if let Some(pa) = pending {
404 if let Some(ref path) = pa.response_path {
405 if let Some(ref value) = payload.action_response.value {
406 surface.data_model.borrow_mut().set(path, value.clone());
407 }
408 }
409 return Ok(());
410 }
411 }
412
413 Ok(())
416 }
417
418 fn queue_outgoing(&mut self, msg: ClientMessage) {
420 self.outgoing_messages.push(msg);
421 }
422
423 fn run_payload_validation(
428 &mut self,
429 components: &[serde_json::Value],
430 cfg: crate::validate::ValidationConfig,
431 ) {
432 use crate::validate::{RefFieldSpec, ROOT_ID};
433
434 let spec = RefFieldSpec::DEFAULT;
435 let mut report = crate::validate::validate_component_integrity(
436 components,
437 &spec,
438 ROOT_ID,
439 cfg.allow_dangling_references,
440 cfg.allow_missing_root,
441 );
442 let (_, topo) = crate::validate::analyze_topology(
443 components,
444 &spec,
445 ROOT_ID,
446 cfg.allow_orphan_components,
447 cfg.allow_missing_root,
448 );
449 report.extend(topo);
450 self.pending_validation.extend(report);
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 fn make_processor() -> MessageProcessor {
459 MessageProcessor::new(vec![])
460 }
461
462 #[test]
463 fn test_create_and_delete_surface() {
464 let mut proc = make_processor();
465
466 let msg = serde_json::json!({
467 "version": "v1.0",
468 "createSurface": {
469 "surfaceId": "test_1",
470 "catalogId": "https://example.com/catalog.json"
471 }
472 });
473 let parsed = MessageProcessor::parse_message(&msg.to_string()).unwrap();
474 proc.process_message(parsed).unwrap();
475
476 assert!(proc.model.get_surface("test_1").is_some());
477
478 let del = serde_json::json!({
479 "version": "v1.0",
480 "deleteSurface": {
481 "surfaceId": "test_1"
482 }
483 });
484 let parsed = MessageProcessor::parse_message(&del.to_string()).unwrap();
485 proc.process_message(parsed).unwrap();
486
487 assert!(proc.model.get_surface("test_1").is_none());
488 }
489
490 #[test]
491 fn test_update_components() {
492 let mut proc = make_processor();
493
494 let create = serde_json::json!({
496 "version": "v1.0",
497 "createSurface": {
498 "surfaceId": "s1",
499 "catalogId": "test"
500 }
501 });
502 proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
503
504 let update = serde_json::json!({
506 "version": "v1.0",
507 "updateComponents": {
508 "surfaceId": "s1",
509 "components": [
510 {"id": "root", "component": "Column", "children": ["hello"]},
511 {"id": "hello", "component": "Text", "text": "Hello World"}
512 ]
513 }
514 });
515 proc.process_message(MessageProcessor::parse_message(&update.to_string()).unwrap()).unwrap();
516
517 let surface = proc.model.get_surface("s1").unwrap();
518 let components = surface.components.borrow();
519 assert!(components.contains("root"));
520 assert!(components.contains("hello"));
521 assert_eq!(components.get("hello").unwrap().component_type, "Text");
522 }
523
524 #[test]
525 fn test_update_data_model() {
526 let mut proc = make_processor();
527
528 let create = serde_json::json!({
529 "version": "v1.0",
530 "createSurface": {
531 "surfaceId": "s1",
532 "catalogId": "test",
533 "dataModel": {"name": "Alice"}
534 }
535 });
536 proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
537
538 let update = serde_json::json!({
540 "version": "v1.0",
541 "updateDataModel": {
542 "surfaceId": "s1",
543 "path": "/name",
544 "value": "Bob"
545 }
546 });
547 proc.process_message(MessageProcessor::parse_message(&update.to_string()).unwrap()).unwrap();
548
549 let surface = proc.model.get_surface("s1").unwrap();
550 assert_eq!(
551 surface.data_model.borrow().get("/name"),
552 Some(&serde_json::json!("Bob"))
553 );
554 }
555
556 #[test]
557 fn test_duplicate_surface_error() {
558 let mut proc = make_processor();
559
560 let create = serde_json::json!({
561 "version": "v1.0",
562 "createSurface": {
563 "surfaceId": "dup",
564 "catalogId": "test"
565 }
566 });
567 proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap()).unwrap();
568
569 let result = proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap());
570 assert!(result.is_err());
571 }
572
573 #[test]
574 fn test_parse_jsonl() {
575 let jsonl = r#"
576{"version":"v1.0","createSurface":{"surfaceId":"main","catalogId":"test"}}
577{"version":"v1.0","updateComponents":{"surfaceId":"main","components":[{"id":"root","component":"Text","text":"Hi"}]}}
578"#;
579 let messages = MessageProcessor::parse_jsonl(jsonl);
580 assert_eq!(messages.len(), 2);
581 assert!(messages[0].is_ok());
582 assert!(messages[1].is_ok());
583 }
584
585 #[test]
586 fn test_spec_simple_text_sample() {
587 let sample = r#"{
588 "name": "Simple Text",
589 "description": "Basic text rendering",
590 "messages": [
591 {
592 "version": "v1.0",
593 "createSurface": {
594 "surfaceId": "example_1",
595 "catalogId": "https://a2ui.org/specification/v1_0/catalogs/minimal/catalog.json"
596 }
597 },
598 {
599 "version": "v1.0",
600 "updateComponents": {
601 "surfaceId": "example_1",
602 "components": [
603 {"id": "root", "component": "Text", "text": "Hello, Minimal Catalog!", "variant": "h1"}
604 ]
605 }
606 }
607 ]
608 }"#;
609
610 let (name, desc, messages) = MessageProcessor::load_sample(sample).unwrap();
611 assert_eq!(name, "Simple Text");
612 assert_eq!(messages.len(), 2);
613
614 let mut proc = make_processor();
615 let results = proc.process_messages(messages);
616 assert!(results.iter().all(|r| r.is_ok()));
617
618 let surface = proc.model.get_surface("example_1").unwrap();
619 let components = surface.components.borrow();
620 let root = components.get("root").unwrap();
621 assert_eq!(root.component_type, "Text");
622 assert_eq!(root.get_raw("text").unwrap(), "Hello, Minimal Catalog!");
623 assert_eq!(root.get_raw("variant").unwrap(), "h1");
624 }
625
626 #[test]
627 fn test_spec_login_form_sample() {
628 let sample = r#"{
629 "name": "Login Form",
630 "description": "Form with input fields and action",
631 "messages": [
632 {
633 "version": "v1.0",
634 "createSurface": {
635 "surfaceId": "example_4",
636 "catalogId": "https://a2ui.org/specification/v1_0/catalogs/minimal/catalog.json",
637 "sendDataModel": true
638 }
639 },
640 {
641 "version": "v1.0",
642 "updateComponents": {
643 "surfaceId": "example_4",
644 "components": [
645 {"id": "root", "component": "Column", "children": ["form_title", "username_field", "password_field", "submit_button"], "justify": "start", "align": "stretch"},
646 {"id": "form_title", "component": "Text", "text": "Login", "variant": "h2"},
647 {"id": "username_field", "component": "TextField", "label": "Username", "value": {"path": "/username"}, "variant": "shortText"},
648 {"id": "password_field", "component": "TextField", "label": "Password", "value": {"path": "/password"}, "variant": "obscured"},
649 {"id": "submit_button", "component": "Button", "child": "submit_label", "variant": "primary", "action": {"event": {"name": "login_submitted", "context": {"user": {"path": "/username"}, "pass": {"path": "/password"}}}}},
650 {"id": "submit_label", "component": "Text", "text": "Sign In"}
651 ]
652 }
653 }
654 ]
655 }"#;
656
657 let (_name, _desc, messages) = MessageProcessor::load_sample(sample).unwrap();
658 assert_eq!(messages.len(), 2);
659
660 let mut proc = make_processor();
661 let results = proc.process_messages(messages);
662 assert!(results.iter().all(|r| r.is_ok()));
663
664 let surface = proc.model.get_surface("example_4").unwrap();
665 assert!(surface.send_data_model);
666
667 let components = surface.components.borrow();
668 assert_eq!(components.len(), 6);
669
670 let root = components.get("root").unwrap();
671 assert_eq!(root.component_type, "Column");
672 let children = root.children().unwrap();
673 match children {
674 crate::protocol::common_types::ChildList::Static(ids) => {
675 assert_eq!(ids.len(), 4);
676 assert_eq!(ids[0], "form_title");
677 }
678 _ => panic!("expected static children"),
679 }
680
681 let submit = components.get("submit_button").unwrap();
682 assert_eq!(submit.component_type, "Button");
683 assert!(submit.action().is_some());
684 }
685
686 #[test]
687 fn test_validation_hook_reports_and_still_loads() {
688 let mut proc =
692 MessageProcessor::new(vec![]).with_validation(crate::validate::STRICT_VALIDATION);
693
694 let create = serde_json::json!({
695 "version": "v1.0",
696 "createSurface": {
697 "surfaceId": "s1",
698 "catalogId": "test",
699 "components": [
700 {"id": "root", "component": "Column", "children": ["ghost"]}
701 ]
702 }
703 });
704 proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap())
705 .unwrap();
706
707 assert!(proc.model.get_surface("s1").is_some());
709 let surface = proc.model.get_surface("s1").unwrap();
710 assert!(surface.components.borrow().contains("root"));
711
712 let report = proc.drain_validation();
714 assert!(!report.is_empty());
715 assert!(report.has_code(&crate::validate::ValidationErrorCode::DanglingReference));
716
717 let update = serde_json::json!({
719 "version": "v1.0",
720 "updateComponents": {
721 "surfaceId": "s1",
722 "components": [
723 {"id": "root2", "component": "Column", "child": "also_ghost"}
724 ]
725 }
726 });
727 proc.process_message(MessageProcessor::parse_message(&update.to_string()).unwrap())
728 .unwrap();
729 let report2 = proc.drain_validation();
730 assert!(!report2.is_empty());
731 }
732
733 #[test]
734 fn test_default_processor_has_empty_validation() {
735 let mut proc = MessageProcessor::new(vec![]);
737
738 let create = serde_json::json!({
739 "version": "v1.0",
740 "createSurface": {
741 "surfaceId": "s1",
742 "catalogId": "test",
743 "components": [
744 {"id": "root", "component": "Column", "children": ["ghost"]}
745 ]
746 }
747 });
748 proc.process_message(MessageProcessor::parse_message(&create.to_string()).unwrap())
749 .unwrap();
750
751 assert!(proc.model.get_surface("s1").is_some());
753 assert!(proc
754 .model
755 .get_surface("s1")
756 .unwrap()
757 .components
758 .borrow()
759 .contains("root"));
760
761 assert!(proc.drain_validation().is_empty());
763 }
764
765}