Skip to main content

slop_ai/
server.rs

1//! SLOP server — manages registrations, connections, subscriptions, and message routing.
2
3use std::collections::HashMap;
4use std::sync::{Arc, RwLock};
5
6use serde_json::{json, Value};
7
8use crate::descriptor::ActionHandler;
9use crate::diff::diff_nodes;
10use crate::error::Result;
11use crate::scaling::{get_subtree, prepare_tree, OutputTreeOptions};
12use crate::tree::assemble_tree;
13use crate::types::SlopNode;
14
15/// A connected consumer.
16pub trait Connection: Send + Sync {
17    fn send(&self, message: &Value) -> Result<()>;
18    fn close(&self) -> Result<()>;
19}
20
21struct Subscription {
22    id: String,
23    path: String,
24    depth: Option<usize>,
25    max_nodes: Option<usize>,
26    filter_types: Option<Vec<String>>,
27    filter_min_salience: Option<f64>,
28    connection: Arc<dyn Connection>,
29    last_tree: Option<SlopNode>,
30    /// Per-subscription sequence number. See spec/core/messages.md.
31    seq: u64,
32}
33
34/// Options for action registration.
35pub struct ActionOptions {
36    pub label: Option<String>,
37    pub description: Option<String>,
38    pub dangerous: bool,
39    pub idempotent: bool,
40    pub estimate: Option<String>,
41    pub params: Option<Value>,
42}
43
44impl ActionOptions {
45    pub fn new() -> Self {
46        Self {
47            label: None,
48            description: None,
49            dangerous: false,
50            idempotent: false,
51            estimate: None,
52            params: None,
53        }
54    }
55
56    pub fn label(mut self, label: impl Into<String>) -> Self {
57        self.label = Some(label.into());
58        self
59    }
60
61    pub fn description(mut self, desc: impl Into<String>) -> Self {
62        self.description = Some(desc.into());
63        self
64    }
65
66    pub fn dangerous(mut self, v: bool) -> Self {
67        self.dangerous = v;
68        self
69    }
70
71    pub fn idempotent(mut self, v: bool) -> Self {
72        self.idempotent = v;
73        self
74    }
75
76    pub fn estimate(mut self, est: impl Into<String>) -> Self {
77        self.estimate = Some(est.into());
78        self
79    }
80
81    pub fn params(mut self, params: Value) -> Self {
82        self.params = Some(params);
83        self
84    }
85}
86
87impl Default for ActionOptions {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93type ActionHandlerFn = dyn Fn(&Value) -> Result<Option<Value>> + Send + Sync;
94
95struct Inner {
96    id: String,
97    name: String,
98    static_registrations: HashMap<String, Value>,
99    dynamic_registrations: HashMap<String, Box<dyn Fn() -> Value + Send + Sync>>,
100    action_handlers: HashMap<String, Arc<ActionHandlerFn>>,
101    action_metadata: HashMap<String, Value>,
102    current_tree: SlopNode,
103    current_handlers: HashMap<String, ActionHandler>,
104    version: u64,
105    subscriptions: Vec<Subscription>,
106    connections: Vec<Arc<dyn Connection>>,
107    change_listeners: Vec<Box<dyn Fn() + Send + Sync>>,
108}
109
110/// SLOP server provider.
111///
112/// Manages node registrations, connections, and message routing.
113/// Thread-safe — can be shared across threads via `Clone` (it wraps `Arc`).
114///
115/// # Example
116///
117/// ```
118/// use slop_ai::SlopServer;
119/// use serde_json::json;
120///
121/// let mut slop = SlopServer::new("my-app", "My App");
122/// slop.register("status", json!({"type": "status", "props": {"healthy": true}}));
123/// assert_eq!(slop.version(), 1);
124/// ```
125pub struct SlopServer {
126    inner: Arc<RwLock<Inner>>,
127}
128
129impl Clone for SlopServer {
130    fn clone(&self) -> Self {
131        Self {
132            inner: Arc::clone(&self.inner),
133        }
134    }
135}
136
137impl SlopServer {
138    /// Create a new SLOP server with the given provider ID and name.
139    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
140        let id = id.into();
141        let name = name.into();
142        Self {
143            inner: Arc::new(RwLock::new(Inner {
144                current_tree: SlopNode::new(&id, "root"),
145                id,
146                name,
147                static_registrations: HashMap::new(),
148                dynamic_registrations: HashMap::new(),
149                action_handlers: HashMap::new(),
150                action_metadata: HashMap::new(),
151                current_handlers: HashMap::new(),
152                version: 0,
153                subscriptions: Vec::new(),
154                connections: Vec::new(),
155                change_listeners: Vec::new(),
156            })),
157        }
158    }
159
160    /// Current state tree.
161    pub fn tree(&self) -> SlopNode {
162        self.inner.read().unwrap().current_tree.clone()
163    }
164
165    /// Current version number.
166    pub fn version(&self) -> u64 {
167        self.inner.read().unwrap().version
168    }
169
170    /// Register a static node descriptor at `path`.
171    pub fn register(&self, path: impl Into<String>, descriptor: Value) {
172        let path = path.into();
173        let mut inner = self.inner.write().unwrap();
174        inner.dynamic_registrations.remove(&path);
175        // Merge action metadata into descriptor
176        let merged = merge_action_metadata(&path, descriptor, &inner.action_metadata);
177        inner.static_registrations.insert(path, merged);
178        rebuild(&mut inner);
179    }
180
181    /// Register a descriptor function re-evaluated on `refresh()`.
182    pub fn register_fn<F>(&self, path: impl Into<String>, f: F)
183    where
184        F: Fn() -> Value + Send + Sync + 'static,
185    {
186        let path = path.into();
187        let mut inner = self.inner.write().unwrap();
188        inner.static_registrations.remove(&path);
189        inner.dynamic_registrations.insert(path, Box::new(f));
190        rebuild(&mut inner);
191    }
192
193    /// Register an action handler at `path/name`.
194    pub fn action<F>(&self, path: impl Into<String>, name: impl Into<String>, handler: F)
195    where
196        F: Fn(&Value) -> Result<Option<Value>> + Send + Sync + 'static,
197    {
198        let path = path.into();
199        let name = name.into();
200        let key = if path.is_empty() {
201            name.clone()
202        } else {
203            format!("{path}/{name}")
204        };
205        let mut inner = self.inner.write().unwrap();
206        inner.action_handlers.insert(key.clone(), Arc::new(handler));
207        // Store minimal metadata for the affordance
208        inner
209            .action_metadata
210            .entry(path.clone())
211            .or_insert_with(|| json!({}))
212            .as_object_mut()
213            .unwrap()
214            .insert(name, json!({}));
215        // Re-merge if path is statically registered
216        if let Some(desc) = inner.static_registrations.remove(&path) {
217            let merged = merge_action_metadata(&path, desc, &inner.action_metadata);
218            inner.static_registrations.insert(path, merged);
219            rebuild(&mut inner);
220        }
221    }
222
223    /// Register an action handler with metadata (label, dangerous, etc.).
224    pub fn action_with<F>(
225        &self,
226        path: impl Into<String>,
227        name: impl Into<String>,
228        handler: F,
229        options: ActionOptions,
230    ) where
231        F: Fn(&Value) -> Result<Option<Value>> + Send + Sync + 'static,
232    {
233        let path = path.into();
234        let name = name.into();
235        let key = if path.is_empty() {
236            name.clone()
237        } else {
238            format!("{path}/{name}")
239        };
240        let mut inner = self.inner.write().unwrap();
241        inner.action_handlers.insert(key.clone(), Arc::new(handler));
242
243        let mut meta = serde_json::Map::new();
244        if let Some(label) = &options.label {
245            meta.insert("label".into(), json!(label));
246        }
247        if let Some(desc) = &options.description {
248            meta.insert("description".into(), json!(desc));
249        }
250        if options.dangerous {
251            meta.insert("dangerous".into(), json!(true));
252        }
253        if options.idempotent {
254            meta.insert("idempotent".into(), json!(true));
255        }
256        if let Some(est) = &options.estimate {
257            meta.insert("estimate".into(), json!(est));
258        }
259        if let Some(params) = &options.params {
260            meta.insert("params".into(), params.clone());
261        }
262
263        inner
264            .action_metadata
265            .entry(path.clone())
266            .or_insert_with(|| json!({}))
267            .as_object_mut()
268            .unwrap()
269            .insert(name, Value::Object(meta));
270
271        if let Some(desc) = inner.static_registrations.remove(&path) {
272            let merged = merge_action_metadata(&path, desc, &inner.action_metadata);
273            inner.static_registrations.insert(path, merged);
274            rebuild(&mut inner);
275        }
276    }
277
278    /// Remove the registration at `path`.
279    pub fn unregister(&self, path: &str) {
280        let mut inner = self.inner.write().unwrap();
281        inner.static_registrations.remove(path);
282        inner.dynamic_registrations.remove(path);
283        rebuild(&mut inner);
284    }
285
286    /// Return a scoped server that prefixes all paths.
287    pub fn scope(&self, prefix: impl Into<String>) -> ScopedServer {
288        ScopedServer {
289            server: self.clone(),
290            prefix: prefix.into(),
291        }
292    }
293
294    /// Re-evaluate all dynamic registrations, diff, and broadcast patches.
295    pub fn refresh(&self) {
296        let mut inner = self.inner.write().unwrap();
297        rebuild(&mut inner);
298    }
299
300    // --- Connection lifecycle ---
301
302    /// Handle a new consumer connection.
303    pub fn handle_connection(&self, conn: Arc<dyn Connection>) {
304        let inner = self.inner.read().unwrap();
305        let _ = conn.send(&json!({
306            "type": "hello",
307            "provider": {
308                "id": inner.id,
309                "name": inner.name,
310                "slop_version": "0.1",
311                "capabilities": ["state", "patches", "affordances", "attention", "windowing", "async", "content_refs"]
312            }
313        }));
314        drop(inner);
315        self.inner.write().unwrap().connections.push(conn);
316    }
317
318    /// Emit an event to all connected consumers.
319    pub fn emit_event(&self, name: &str, data: Option<Value>) {
320        let inner = self.inner.read().unwrap();
321        let mut msg = json!({ "type": "event", "name": name });
322        if let Some(d) = data {
323            msg["data"] = d;
324        }
325        for conn in &inner.connections {
326            let _ = conn.send(&msg);
327        }
328    }
329
330    /// Process an incoming message from a consumer.
331    pub fn handle_message(&self, conn: &Arc<dyn Connection>, msg: &Value) {
332        let msg_type = msg["type"].as_str().unwrap_or("");
333        let msg_id = msg["id"].as_str().unwrap_or("").to_string();
334        match msg_type {
335            "subscribe" => {
336                let sub_id = msg_id;
337                let path = msg["path"].as_str().unwrap_or("/").to_string();
338                let depth = parse_depth(msg);
339                let max_nodes = msg
340                    .get("max_nodes")
341                    .and_then(|v| v.as_u64())
342                    .map(|v| v as usize);
343                let filter_types = parse_filter_types(msg);
344                let filter_min_salience = msg
345                    .get("filter")
346                    .and_then(|f| f.get("min_salience"))
347                    .and_then(|v| v.as_f64());
348
349                let inner = self.inner.read().unwrap();
350
351                // Resolve subtree; send error if path not found
352                let output = get_output_tree(
353                    &inner.current_tree,
354                    &path,
355                    depth,
356                    max_nodes,
357                    filter_min_salience,
358                    filter_types.as_deref(),
359                );
360
361                match output {
362                    None => {
363                        let _ = conn.send(&json!({
364                            "type": "error",
365                            "id": sub_id,
366                            "error": {
367                                "code": "not_found",
368                                "message": format!("Path {} does not exist in the state tree", path)
369                            }
370                        }));
371                    }
372                    Some(tree) => {
373                        let _ = conn.send(&json!({
374                            "type": "snapshot",
375                            "id": sub_id,
376                            "version": inner.version,
377                            "seq": 0u64,
378                            "tree": serde_json::to_value(&tree).unwrap()
379                        }));
380                        let last_tree = Some(tree);
381                        drop(inner);
382                        self.inner
383                            .write()
384                            .unwrap()
385                            .subscriptions
386                            .push(Subscription {
387                                id: sub_id,
388                                path,
389                                depth,
390                                max_nodes,
391                                filter_types,
392                                filter_min_salience,
393                                connection: Arc::clone(conn),
394                                last_tree,
395                                seq: 0,
396                            });
397                    }
398                }
399            }
400            "unsubscribe" => {
401                let sub_id = msg["id"].as_str().unwrap_or("");
402                self.inner
403                    .write()
404                    .unwrap()
405                    .subscriptions
406                    .retain(|s| s.id != sub_id);
407            }
408            "query" => {
409                let path = msg["path"].as_str().unwrap_or("/").to_string();
410                let depth = parse_depth(msg);
411                let max_nodes = msg
412                    .get("max_nodes")
413                    .and_then(|v| v.as_u64())
414                    .map(|v| v as usize);
415                let filter_types = parse_filter_types(msg);
416                let filter_min_salience = msg
417                    .get("filter")
418                    .and_then(|f| f.get("min_salience"))
419                    .and_then(|v| v.as_f64());
420                let window = msg.get("window").and_then(|w| {
421                    let arr = w.as_array()?;
422                    if arr.len() == 2 {
423                        Some((arr[0].as_u64()? as usize, arr[1].as_u64()? as usize))
424                    } else {
425                        None
426                    }
427                });
428
429                let inner = self.inner.read().unwrap();
430                let output = get_output_tree(
431                    &inner.current_tree,
432                    &path,
433                    depth,
434                    max_nodes,
435                    filter_min_salience,
436                    filter_types.as_deref(),
437                );
438
439                match output {
440                    None => {
441                        let _ = conn.send(&json!({
442                            "type": "error",
443                            "id": msg_id,
444                            "error": {
445                                "code": "not_found",
446                                "message": format!("Path {} does not exist in the state tree", path)
447                            }
448                        }));
449                    }
450                    Some(mut tree) => {
451                        // Apply window to children
452                        if let Some((offset, count)) = window {
453                            if let Some(children) = &tree.children {
454                                let total = children.len();
455                                let start = offset.min(total);
456                                let end = (offset + count).min(total);
457                                let windowed: Vec<SlopNode> = children[start..end].to_vec();
458                                tree.children = if windowed.is_empty() {
459                                    None
460                                } else {
461                                    Some(windowed)
462                                };
463                                // Record window metadata
464                                let meta = tree.meta.get_or_insert_with(Default::default);
465                                meta.total_children = Some(total);
466                                meta.window = Some((offset, count));
467                            }
468                        }
469                        let _ = conn.send(&json!({
470                            "type": "snapshot",
471                            "id": msg_id,
472                            "version": inner.version,
473                            "tree": serde_json::to_value(&tree).unwrap()
474                        }));
475                    }
476                }
477            }
478            "invoke" => {
479                self.handle_invoke(conn, msg);
480            }
481            _ => {
482                let _ = conn.send(&json!({
483                    "type": "error",
484                    "id": msg_id,
485                    "error": {
486                        "code": "bad_request",
487                        "message": "Unknown message type"
488                    }
489                }));
490            }
491        }
492    }
493
494    /// Handle a consumer disconnect.
495    pub fn handle_disconnect(&self, conn: &Arc<dyn Connection>) {
496        let mut inner = self.inner.write().unwrap();
497        let conn_ptr = Arc::as_ptr(conn);
498        inner.connections.retain(|c| !Arc::ptr_eq(c, conn));
499        inner
500            .subscriptions
501            .retain(|s| !std::ptr::addr_eq(Arc::as_ptr(&s.connection), conn_ptr));
502    }
503
504    /// Register a callback fired after each tree change.
505    pub fn on_change<F: Fn() + Send + Sync + 'static>(&self, callback: F) {
506        self.inner
507            .write()
508            .unwrap()
509            .change_listeners
510            .push(Box::new(callback));
511    }
512
513    /// Close all connections and clean up.
514    pub fn stop(&self) {
515        let mut inner = self.inner.write().unwrap();
516        for conn in &inner.connections {
517            let _ = conn.close();
518        }
519        inner.connections.clear();
520        inner.subscriptions.clear();
521    }
522
523    fn handle_invoke(&self, conn: &Arc<dyn Connection>, msg: &Value) {
524        let path = msg["path"].as_str().unwrap_or("");
525        let action = msg["action"].as_str().unwrap_or("");
526        let params = msg.get("params").cloned().unwrap_or(json!({}));
527        let msg_id = msg["id"].as_str().unwrap_or("").to_string();
528
529        // Clone handler out so we can drop the lock before calling it
530        let handler = {
531            let inner = self.inner.read().unwrap();
532            let handler_key = resolve_handler_key(&inner, path, action);
533            inner
534                .current_handlers
535                .get(&handler_key)
536                .cloned()
537                .or_else(|| inner.action_handlers.get(&handler_key).cloned())
538        };
539
540        match handler {
541            None => {
542                let _ = conn.send(&json!({
543                    "type": "result",
544                    "id": msg_id,
545                    "status": "error",
546                    "error": {"code": "not_found", "message": format!("No handler for {action} at {path}")}
547                }));
548            }
549            Some(h) => {
550                // Spec: providers MUST validate invoke params against the
551                // affordance's declared schema before running the handler.
552                if let Some(schema) = self.resolve_affordance_params(path, action) {
553                    if let Some(err) =
554                        crate::validate_params::validate_params(Some(&schema), &params)
555                    {
556                        let _ = conn.send(&json!({
557                            "type": "result",
558                            "id": msg_id,
559                            "status": "error",
560                            "error": {"code": "invalid_params", "message": err}
561                        }));
562                        return;
563                    }
564                }
565                match h(&params) {
566                    Ok(data) => {
567                        let is_async = data
568                            .as_ref()
569                            .and_then(|d| d.get("__async"))
570                            .and_then(|v| v.as_bool())
571                            .unwrap_or(false);
572                        let mut resp = json!({
573                            "type": "result",
574                            "id": msg_id,
575                            "status": if is_async { "accepted" } else { "ok" }
576                        });
577                        if let Some(d) = &data {
578                            if let Some(obj) = d.as_object() {
579                                let filtered: serde_json::Map<String, Value> = obj
580                                    .iter()
581                                    .filter(|(k, _)| k.as_str() != "__async")
582                                    .map(|(k, v)| (k.clone(), v.clone()))
583                                    .collect();
584                                if !filtered.is_empty() {
585                                    resp["data"] = Value::Object(filtered);
586                                }
587                            }
588                        }
589                        let _ = conn.send(&resp);
590                    }
591                    Err(e) => {
592                        let _ = conn.send(&json!({
593                            "type": "result",
594                            "id": msg_id,
595                            "status": "error",
596                            "error": {"code": "internal", "message": e.to_string()}
597                        }));
598                    }
599                }
600                // Auto-refresh after invoke
601                self.refresh();
602            }
603        }
604    }
605}
606
607impl SlopServer {
608    /// Walk the current tree to find the affordance's `params` schema for
609    /// (path, action). Returns `None` if the node or action is absent.
610    fn resolve_affordance_params(&self, path: &str, action: &str) -> Option<Value> {
611        let inner = self.inner.read().unwrap();
612        let id = &inner.id;
613        let root_prefix = format!("/{id}");
614        let tree_path = if path == root_prefix {
615            "/".to_string()
616        } else if path.starts_with(&format!("{root_prefix}/")) {
617            path[root_prefix.len()..].to_string()
618        } else {
619            path.to_string()
620        };
621        let node = if tree_path == "/" {
622            Some(&inner.current_tree)
623        } else {
624            find_node_by_path(&inner.current_tree, &tree_path)
625        };
626        let affordances = node?.affordances.as_ref()?;
627        let aff = affordances.iter().find(|a| a.action == action)?;
628        aff.params.clone()
629    }
630}
631
632fn find_node_by_path<'a>(root: &'a SlopNode, path: &str) -> Option<&'a SlopNode> {
633    let mut current = root;
634    for seg in path.trim_start_matches('/').split('/') {
635        if seg.is_empty() {
636            continue;
637        }
638        let children = current.children.as_ref()?;
639        current = children.iter().find(|c| c.id == seg)?;
640    }
641    Some(current)
642}
643
644/// A scoped view of a `SlopServer` that prefixes all paths.
645pub struct ScopedServer {
646    server: SlopServer,
647    prefix: String,
648}
649
650impl ScopedServer {
651    pub fn register(&self, path: &str, descriptor: Value) {
652        self.server
653            .register(format!("{}/{path}", self.prefix), descriptor);
654    }
655
656    pub fn register_fn<F>(&self, path: &str, f: F)
657    where
658        F: Fn() -> Value + Send + Sync + 'static,
659    {
660        self.server
661            .register_fn(format!("{}/{path}", self.prefix), f);
662    }
663
664    pub fn action<F>(&self, path: &str, name: impl Into<String>, handler: F)
665    where
666        F: Fn(&Value) -> Result<Option<Value>> + Send + Sync + 'static,
667    {
668        self.server
669            .action(format!("{}/{path}", self.prefix), name, handler);
670    }
671
672    pub fn unregister(&self, path: &str) {
673        self.server.unregister(&format!("{}/{path}", self.prefix));
674    }
675
676    pub fn scope(&self, sub_prefix: &str) -> ScopedServer {
677        self.server.scope(format!("{}/{sub_prefix}", self.prefix))
678    }
679
680    pub fn refresh(&self) {
681        self.server.refresh();
682    }
683}
684
685// --- Internal helpers ---
686
687fn rebuild(inner: &mut Inner) {
688    let mut all_descriptors: HashMap<String, Value> = HashMap::new();
689
690    // Evaluate dynamic registrations
691    for (path, f) in &inner.dynamic_registrations {
692        let desc = f();
693        let merged = merge_action_metadata(path, desc, &inner.action_metadata);
694        all_descriptors.insert(path.clone(), merged);
695    }
696
697    // Static registrations
698    for (path, desc) in &inner.static_registrations {
699        all_descriptors.insert(path.clone(), desc.clone());
700    }
701
702    let (tree, handlers) = assemble_tree(&all_descriptors, &inner.id, &inner.name);
703    let ops = diff_nodes(&inner.current_tree, &tree, "");
704
705    // Merge descriptor handlers with explicitly registered action handlers
706    let merged_handlers = handlers;
707    // action_handlers take precedence (registered via .action())
708    // but we can't move out of inner, so we skip merging here — lookups check both maps.
709    inner.current_handlers = merged_handlers;
710
711    if !ops.is_empty() {
712        inner.current_tree = tree;
713        inner.version += 1;
714        broadcast_patches(inner);
715        for listener in &inner.change_listeners {
716            listener();
717        }
718    } else if inner.version == 0 {
719        inner.current_tree = tree;
720        inner.version = 1;
721    }
722}
723
724fn broadcast_patches(inner: &mut Inner) {
725    let version = inner.version;
726    for sub in &mut inner.subscriptions {
727        // Compute per-subscription output tree using stored path/depth/filter
728        let new_tree = get_output_tree(
729            &inner.current_tree,
730            &sub.path,
731            sub.depth,
732            sub.max_nodes,
733            sub.filter_min_salience,
734            sub.filter_types.as_deref(),
735        );
736
737        let new_tree = match new_tree {
738            Some(t) => t,
739            None => continue, // path no longer exists — skip
740        };
741
742        let ops = match &sub.last_tree {
743            Some(old) => diff_nodes(old, &new_tree, ""),
744            None => diff_nodes(&SlopNode::new(&inner.id, "root"), &new_tree, ""),
745        };
746        if !ops.is_empty() {
747            sub.seq += 1;
748            let ops_val = serde_json::to_value(&ops).unwrap();
749            let _ = sub.connection.send(&json!({
750                "type": "patch",
751                "subscription": sub.id,
752                "version": version,
753                "seq": sub.seq,
754                "ops": ops_val
755            }));
756        }
757        sub.last_tree = Some(new_tree);
758    }
759}
760
761fn resolve_handler_key(inner: &Inner, path: &str, action: &str) -> String {
762    let root_prefix = format!("/{}/", inner.id);
763    let clean = if path.starts_with(&root_prefix) {
764        &path[root_prefix.len()..]
765    } else if let Some(stripped) = path.strip_prefix('/') {
766        stripped
767    } else {
768        path
769    };
770
771    if clean.is_empty() {
772        action.to_string()
773    } else {
774        format!("{clean}/{action}")
775    }
776}
777
778/// Resolve a subtree at `path`, then apply depth/filter/max_nodes via `prepare_tree`.
779/// Returns `None` if the path does not exist.
780fn get_output_tree(
781    full_tree: &SlopNode,
782    path: &str,
783    depth: Option<usize>,
784    max_nodes: Option<usize>,
785    min_salience: Option<f64>,
786    types: Option<&[String]>,
787) -> Option<SlopNode> {
788    let subtree = if path.is_empty() || path == "/" {
789        full_tree
790    } else {
791        get_subtree(full_tree, path)?
792    };
793
794    let opts = OutputTreeOptions {
795        max_depth: depth,
796        max_nodes,
797        min_salience,
798        types: types.map(|t| t.to_vec()),
799    };
800    Some(prepare_tree(subtree, &opts))
801}
802
803/// Parse the `depth` field from a subscribe/query message.
804/// Returns `None` for unlimited (-1 or absent).
805fn parse_depth(msg: &Value) -> Option<usize> {
806    match msg.get("depth").and_then(|v| v.as_i64()) {
807        Some(d) if d >= 0 => Some(d as usize),
808        _ => None,
809    }
810}
811
812/// Parse the `filter.types` array from a subscribe/query message.
813fn parse_filter_types(msg: &Value) -> Option<Vec<String>> {
814    msg.get("filter")
815        .and_then(|f| f.get("types"))
816        .and_then(|t| t.as_array())
817        .map(|arr| {
818            arr.iter()
819                .filter_map(|v| v.as_str().map(String::from))
820                .collect()
821        })
822}
823
824fn merge_action_metadata(
825    path: &str,
826    mut descriptor: Value,
827    action_metadata: &HashMap<String, Value>,
828) -> Value {
829    if let Some(meta) = action_metadata.get(path) {
830        if let Some(meta_obj) = meta.as_object() {
831            if !meta_obj.is_empty() {
832                let desc_obj = descriptor.as_object_mut().unwrap();
833                // If the descriptor already defines actions, treat it as
834                // authoritative — don't add registered actions that aren't
835                // listed. This supports state-dependent affordances where
836                // the descriptor intentionally omits certain actions.
837                if desc_obj.contains_key("actions") {
838                    // Only enrich existing actions with metadata, don't add new ones
839                    let actions = desc_obj["actions"].as_object_mut().unwrap();
840                    for (name, opts) in meta_obj {
841                        if actions.contains_key(name) {
842                            // Merge metadata into existing action (fill gaps)
843                            if let (Some(existing), Some(new)) = (
844                                actions.get_mut(name).and_then(|v| v.as_object_mut()),
845                                opts.as_object(),
846                            ) {
847                                for (k, v) in new {
848                                    if !existing.contains_key(k) {
849                                        existing.insert(k.clone(), v.clone());
850                                    }
851                                }
852                            }
853                        }
854                    }
855                } else {
856                    // No actions in descriptor — add all registered actions
857                    let actions = desc_obj
858                        .entry("actions")
859                        .or_insert_with(|| json!({}))
860                        .as_object_mut()
861                        .unwrap();
862                    for (name, opts) in meta_obj {
863                        if !actions.contains_key(name) {
864                            actions.insert(name.clone(), opts.clone());
865                        }
866                    }
867                }
868            }
869        }
870    }
871    descriptor
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    use std::sync::Mutex;
878
879    struct MockConnection {
880        messages: Mutex<Vec<Value>>,
881    }
882
883    impl MockConnection {
884        fn new() -> Arc<Self> {
885            Arc::new(Self {
886                messages: Mutex::new(Vec::new()),
887            })
888        }
889
890        fn messages(&self) -> Vec<Value> {
891            self.messages.lock().unwrap().clone()
892        }
893    }
894
895    impl Connection for MockConnection {
896        fn send(&self, message: &Value) -> Result<()> {
897            self.messages.lock().unwrap().push(message.clone());
898            Ok(())
899        }
900        fn close(&self) -> Result<()> {
901            Ok(())
902        }
903    }
904
905    #[test]
906    fn test_register_static() {
907        let slop = SlopServer::new("app", "App");
908        slop.register(
909            "status",
910            json!({"type": "status", "props": {"healthy": true}}),
911        );
912        assert_eq!(slop.version(), 1);
913        let tree = slop.tree();
914        assert_eq!(tree.children.as_ref().unwrap().len(), 1);
915        assert_eq!(tree.children.as_ref().unwrap()[0].id, "status");
916    }
917
918    #[test]
919    fn test_register_fn() {
920        let counter = Arc::new(Mutex::new(0));
921        let slop = SlopServer::new("app", "App");
922
923        let c = counter.clone();
924        slop.register_fn("counter", move || {
925            let n = *c.lock().unwrap();
926            json!({"type": "status", "props": {"count": n}})
927        });
928
929        assert_eq!(
930            slop.tree().children.as_ref().unwrap()[0]
931                .properties
932                .as_ref()
933                .unwrap()["count"],
934            0
935        );
936
937        *counter.lock().unwrap() = 5;
938        slop.refresh();
939        assert_eq!(
940            slop.tree().children.as_ref().unwrap()[0]
941                .properties
942                .as_ref()
943                .unwrap()["count"],
944            5
945        );
946    }
947
948    fn as_dyn(conn: &Arc<MockConnection>) -> Arc<dyn Connection> {
949        conn.clone() as Arc<dyn Connection>
950    }
951
952    #[test]
953    fn test_connection_lifecycle() {
954        let slop = SlopServer::new("app", "App");
955        slop.register("x", json!({"type": "group"}));
956
957        let conn = MockConnection::new();
958        let dyn_conn = as_dyn(&conn);
959        slop.handle_connection(dyn_conn.clone());
960
961        let messages = conn.messages();
962        assert_eq!(messages.len(), 1);
963        assert_eq!(messages[0]["type"], "hello");
964        assert_eq!(messages[0]["provider"]["id"], "app");
965
966        // Subscribe
967        slop.handle_message(&dyn_conn, &json!({"type": "subscribe", "id": "sub-1"}));
968        let messages = conn.messages();
969        assert_eq!(messages[1]["type"], "snapshot");
970        assert_eq!(messages[1]["id"], "sub-1");
971
972        // Query
973        slop.handle_message(&dyn_conn, &json!({"type": "query", "id": "q-1"}));
974        let messages = conn.messages();
975        assert_eq!(messages[2]["type"], "snapshot");
976        assert_eq!(messages[2]["id"], "q-1");
977
978        // Disconnect
979        slop.handle_disconnect(&dyn_conn);
980    }
981
982    #[test]
983    fn test_invoke() {
984        let state = Arc::new(Mutex::new(0i32));
985        let slop = SlopServer::new("app", "App");
986        slop.register("counter", json!({"type": "status", "props": {"count": 0}}));
987
988        let s = state.clone();
989        slop.action("counter", "increment", move |_params: &Value| {
990            *s.lock().unwrap() += 1;
991            Ok(None)
992        });
993
994        let conn = MockConnection::new();
995        let dyn_conn = as_dyn(&conn);
996        slop.handle_connection(dyn_conn.clone());
997        slop.handle_message(
998            &dyn_conn,
999            &json!({
1000                "type": "invoke",
1001                "id": "inv-1",
1002                "path": "/app/counter",
1003                "action": "increment"
1004            }),
1005        );
1006
1007        let messages = conn.messages();
1008        let result = messages.iter().find(|m| m["type"] == "result").unwrap();
1009        assert_eq!(result["status"], "ok");
1010        assert_eq!(*state.lock().unwrap(), 1);
1011    }
1012
1013    #[test]
1014    fn test_invoke_not_found() {
1015        let slop = SlopServer::new("app", "App");
1016        let conn = MockConnection::new();
1017        let dyn_conn = as_dyn(&conn);
1018        slop.handle_connection(dyn_conn.clone());
1019        slop.handle_message(
1020            &dyn_conn,
1021            &json!({
1022                "type": "invoke",
1023                "id": "inv-1",
1024                "path": "/app/missing",
1025                "action": "do_it"
1026            }),
1027        );
1028
1029        let messages = conn.messages();
1030        let result = messages.iter().find(|m| m["type"] == "result").unwrap();
1031        assert_eq!(result["status"], "error");
1032        assert_eq!(result["error"]["code"], "not_found");
1033    }
1034
1035    #[test]
1036    fn test_scope() {
1037        let slop = SlopServer::new("app", "App");
1038        let settings = slop.scope("settings");
1039        settings.register(
1040            "account",
1041            json!({"type": "group", "props": {"email": "a@b.com"}}),
1042        );
1043
1044        let tree = slop.tree();
1045        let settings_node = &tree.children.as_ref().unwrap()[0];
1046        assert_eq!(settings_node.id, "settings");
1047        assert_eq!(settings_node.children.as_ref().unwrap()[0].id, "account");
1048    }
1049
1050    #[test]
1051    fn test_unregister() {
1052        let slop = SlopServer::new("app", "App");
1053        slop.register("x", json!({"type": "group"}));
1054        assert_eq!(slop.tree().children.as_ref().unwrap().len(), 1);
1055
1056        slop.unregister("x");
1057        assert!(slop.tree().children.as_ref().map_or(true, |c| c.is_empty()));
1058    }
1059
1060    #[test]
1061    fn test_broadcast_on_change() {
1062        let slop = SlopServer::new("app", "App");
1063        slop.register("x", json!({"type": "group", "props": {"v": 1}}));
1064
1065        let conn = MockConnection::new();
1066        let dyn_conn = as_dyn(&conn);
1067        slop.handle_connection(dyn_conn.clone());
1068        slop.handle_message(&dyn_conn, &json!({"type": "subscribe", "id": "sub-1"}));
1069        let initial_count = conn.messages().len();
1070
1071        slop.register("x", json!({"type": "group", "props": {"v": 2}}));
1072        assert!(conn.messages().len() > initial_count);
1073    }
1074
1075    #[test]
1076    fn test_subscribe_with_depth_limit() {
1077        let slop = SlopServer::new("app", "App");
1078        // Register a nested structure using flat path registrations
1079        slop.register("parent", json!({"type": "group"}));
1080        slop.register("parent/child", json!({"type": "group"}));
1081        slop.register("parent/child/grandchild", json!({"type": "item"}));
1082
1083        let conn = MockConnection::new();
1084        let dyn_conn = as_dyn(&conn);
1085        slop.handle_connection(dyn_conn.clone());
1086
1087        // Subscribe with depth 1 — at depth=1, root shows children, but parent's
1088        // children (child) are collapsed to stubs.
1089        slop.handle_message(
1090            &dyn_conn,
1091            &json!({
1092                "type": "subscribe",
1093                "id": "sub-depth",
1094                "path": "/",
1095                "depth": 1
1096            }),
1097        );
1098
1099        let messages = conn.messages();
1100        let snapshot = messages.iter().find(|m| m["type"] == "snapshot").unwrap();
1101        assert_eq!(snapshot["id"], "sub-depth");
1102
1103        let tree_val = &snapshot["tree"];
1104        let parent = tree_val["children"]
1105            .as_array()
1106            .unwrap()
1107            .iter()
1108            .find(|c| c["id"] == "parent")
1109            .unwrap();
1110        // At depth=1 from root: parent is at depth 0, its children are at depth 1
1111        // which triggers truncation (depth <= 0 on the children pass).
1112        // parent should be a stub with no children and meta.total_children set
1113        assert!(parent.get("children").is_none() || parent["children"].is_null());
1114        assert_eq!(parent["meta"]["total_children"], 1);
1115    }
1116
1117    #[test]
1118    fn test_subscribe_with_salience_filter() {
1119        let slop = SlopServer::new("app", "App");
1120        // Register two nodes with different salience
1121        slop.register(
1122            "high",
1123            json!({
1124                "type": "item",
1125                "meta": {"salience": 0.9}
1126            }),
1127        );
1128        slop.register(
1129            "low",
1130            json!({
1131                "type": "item",
1132                "meta": {"salience": 0.1}
1133            }),
1134        );
1135
1136        let conn = MockConnection::new();
1137        let dyn_conn = as_dyn(&conn);
1138        slop.handle_connection(dyn_conn.clone());
1139
1140        slop.handle_message(
1141            &dyn_conn,
1142            &json!({
1143                "type": "subscribe",
1144                "id": "sub-filter",
1145                "path": "/",
1146                "filter": {"min_salience": 0.5}
1147            }),
1148        );
1149
1150        let messages = conn.messages();
1151        let snapshot = messages.iter().find(|m| m["type"] == "snapshot").unwrap();
1152        let children = snapshot["tree"]["children"].as_array().unwrap();
1153
1154        // Only high-salience node should be present
1155        assert_eq!(children.len(), 1);
1156        assert_eq!(children[0]["id"], "high");
1157    }
1158
1159    #[test]
1160    fn test_unknown_message_returns_error() {
1161        let slop = SlopServer::new("app", "App");
1162        let conn = MockConnection::new();
1163        let dyn_conn = as_dyn(&conn);
1164        slop.handle_connection(dyn_conn.clone());
1165
1166        slop.handle_message(
1167            &dyn_conn,
1168            &json!({
1169                "type": "bogus",
1170                "id": "req-99"
1171            }),
1172        );
1173
1174        let messages = conn.messages();
1175        let error = messages.iter().find(|m| m["type"] == "error").unwrap();
1176        assert_eq!(error["id"], "req-99");
1177        assert_eq!(error["error"]["code"], "bad_request");
1178    }
1179
1180    #[test]
1181    fn test_subscribe_bad_path_returns_error() {
1182        let slop = SlopServer::new("app", "App");
1183        slop.register("x", json!({"type": "group"}));
1184
1185        let conn = MockConnection::new();
1186        let dyn_conn = as_dyn(&conn);
1187        slop.handle_connection(dyn_conn.clone());
1188
1189        slop.handle_message(
1190            &dyn_conn,
1191            &json!({
1192                "type": "subscribe",
1193                "id": "sub-bad",
1194                "path": "/nonexistent/deep"
1195            }),
1196        );
1197
1198        let messages = conn.messages();
1199        let error = messages.iter().find(|m| m["type"] == "error").unwrap();
1200        assert_eq!(error["id"], "sub-bad");
1201        assert_eq!(error["error"]["code"], "not_found");
1202    }
1203
1204    #[test]
1205    fn test_emit_event() {
1206        let slop = SlopServer::new("app", "App");
1207
1208        let conn = MockConnection::new();
1209        let dyn_conn = as_dyn(&conn);
1210        slop.handle_connection(dyn_conn.clone());
1211
1212        slop.emit_event("user-navigation", Some(json!({"from": "/a", "to": "/b"})));
1213
1214        let messages = conn.messages();
1215        let event = messages.iter().find(|m| m["type"] == "event").unwrap();
1216        assert_eq!(event["name"], "user-navigation");
1217        assert_eq!(event["data"]["from"], "/a");
1218        assert_eq!(event["data"]["to"], "/b");
1219    }
1220
1221    #[test]
1222    fn test_emit_event_no_data() {
1223        let slop = SlopServer::new("app", "App");
1224
1225        let conn = MockConnection::new();
1226        let dyn_conn = as_dyn(&conn);
1227        slop.handle_connection(dyn_conn.clone());
1228
1229        slop.emit_event("heartbeat", None);
1230
1231        let messages = conn.messages();
1232        let event = messages.iter().find(|m| m["type"] == "event").unwrap();
1233        assert_eq!(event["name"], "heartbeat");
1234        assert!(event.get("data").is_none());
1235    }
1236
1237    #[test]
1238    fn test_query_with_window() {
1239        let slop = SlopServer::new("app", "App");
1240        // Register a collection with items (array children)
1241        slop.register(
1242            "items",
1243            json!({
1244                "type": "collection",
1245                "items": [
1246                    {"id": "a", "type": "item"},
1247                    {"id": "b", "type": "item"},
1248                    {"id": "c", "type": "item"},
1249                    {"id": "d", "type": "item"},
1250                    {"id": "e", "type": "item"}
1251                ]
1252            }),
1253        );
1254
1255        let conn = MockConnection::new();
1256        let dyn_conn = as_dyn(&conn);
1257        slop.handle_connection(dyn_conn.clone());
1258
1259        // Query with window [1, 2] — should get items b and c
1260        // Path is /items (child of root)
1261        slop.handle_message(
1262            &dyn_conn,
1263            &json!({
1264                "type": "query",
1265                "id": "q-win",
1266                "path": "/items",
1267                "depth": -1,
1268                "window": [1, 2]
1269            }),
1270        );
1271
1272        let messages = conn.messages();
1273        let snapshot = messages.iter().find(|m| m["id"] == "q-win").unwrap();
1274        let children = snapshot["tree"]["children"].as_array().unwrap();
1275        assert_eq!(children.len(), 2);
1276        assert_eq!(children[0]["id"], "b");
1277        assert_eq!(children[1]["id"], "c");
1278        // Metadata should record the window
1279        assert_eq!(snapshot["tree"]["meta"]["total_children"], 5);
1280    }
1281}