1#![allow(clippy::needless_borrow)] #![allow(clippy::unnecessary_cast)] #![allow(clippy::new_without_default)] use async_trait::async_trait;
9use capnweb_core::{
10 il::{ArrayOp, CallOp, CaptureRef, ObjectOp, ParamRef, ResultRef, ValueRef},
11 protocol::{
12 ids::IdAllocator,
13 il_runner::PlanRunner,
14 nested_capabilities::{
15 CapabilityError, CapabilityFactory as CapabilityFactoryTrait, CapabilityGraph,
16 CapabilityMetadata, MethodMetadata, ParameterMetadata,
17 },
18 resume_tokens::{PersistentSessionManager, ResumeToken, ResumeTokenManager},
19 tables::{ExportTable, ImportTable},
20 },
21 CapId, Op, Plan, RpcError, RpcTarget, Source, Value,
22};
23use chrono::Utc;
24use serde_json::{json, Value as JsonValue};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tokio::sync::{Mutex, RwLock};
28
29#[derive(Clone, Debug)]
31struct SessionState {
32 variables: HashMap<String, Value>,
33 operations: Vec<String>,
34 last_result: Option<Value>,
35 #[allow(dead_code)]
36 created_at: i64,
37 last_accessed: i64,
38}
39
40#[derive(Debug, Clone)]
42struct SimpleCapabilityFactory {
43 max_capabilities: usize,
44 created_count: Arc<Mutex<usize>>,
45}
46
47impl SimpleCapabilityFactory {
48 fn new(max_capabilities: usize) -> Self {
49 Self {
50 max_capabilities,
51 created_count: Arc::new(Mutex::new(0)),
52 }
53 }
54}
55
56#[async_trait]
57impl CapabilityFactoryTrait for SimpleCapabilityFactory {
58 async fn create_capability(
59 &self,
60 capability_type: &str,
61 config: Value,
62 ) -> Result<Arc<dyn RpcTarget>, CapabilityError> {
63 let mut count = self.created_count.lock().await;
64 if *count >= self.max_capabilities {
65 return Err(CapabilityError::InvalidConfiguration(format!(
66 "Maximum capabilities limit ({}) exceeded",
67 self.max_capabilities
68 )));
69 }
70 *count += 1;
71
72 let nested_cap = Arc::new(NestedCapabilityImpl {
74 name: capability_type.to_string(),
75 config,
76 parent: None,
77 });
78
79 Ok(nested_cap as Arc<dyn RpcTarget>)
80 }
81
82 fn list_capability_types(&self) -> Vec<String> {
83 vec![
84 "validator".to_string(),
85 "aggregator".to_string(),
86 "processor".to_string(),
87 "analyzer".to_string(),
88 "transformer".to_string(),
89 ]
90 }
91
92 fn get_capability_metadata(&self, capability_type: &str) -> Option<CapabilityMetadata> {
93 match capability_type {
94 "validator" => Some(CapabilityMetadata {
95 name: "validator".to_string(),
96 description: "Validates input data according to rules".to_string(),
97 version: "1.0.0".to_string(),
98 methods: vec![
99 MethodMetadata {
100 name: "validate".to_string(),
101 description: "Validates input against schema".to_string(),
102 parameters: vec![
103 ParameterMetadata {
104 name: "data".to_string(),
105 type_name: "any".to_string(),
106 description: "Data to validate".to_string(),
107 required: true,
108 },
109 ParameterMetadata {
110 name: "rules".to_string(),
111 type_name: "object".to_string(),
112 description: "Validation rules".to_string(),
113 required: false,
114 },
115 ],
116 return_type: "ValidationResult".to_string(),
117 },
118 MethodMetadata {
119 name: "setRules".to_string(),
120 description: "Configure validation rules".to_string(),
121 parameters: vec![ParameterMetadata {
122 name: "rules".to_string(),
123 type_name: "object".to_string(),
124 description: "New validation rules".to_string(),
125 required: true,
126 }],
127 return_type: "void".to_string(),
128 },
129 ],
130 config_schema: Some(Value::Object({
131 let mut obj = std::collections::HashMap::new();
132 obj.insert(
133 "type".to_string(),
134 Box::new(Value::String("object".to_string())),
135 );
136 let mut props = std::collections::HashMap::new();
137 let mut strict_prop = std::collections::HashMap::new();
138 strict_prop.insert(
139 "type".to_string(),
140 Box::new(Value::String("boolean".to_string())),
141 );
142 props.insert("strict".to_string(), Box::new(Value::Object(strict_prop)));
143 let mut allow_prop = std::collections::HashMap::new();
144 allow_prop.insert(
145 "type".to_string(),
146 Box::new(Value::String("boolean".to_string())),
147 );
148 props.insert(
149 "allowExtraFields".to_string(),
150 Box::new(Value::Object(allow_prop)),
151 );
152 let mut depth_prop = std::collections::HashMap::new();
153 depth_prop.insert(
154 "type".to_string(),
155 Box::new(Value::String("number".to_string())),
156 );
157 props.insert("maxDepth".to_string(), Box::new(Value::Object(depth_prop)));
158 obj.insert("properties".to_string(), Box::new(Value::Object(props)));
159 obj
160 })),
161 }),
162 "aggregator" => Some(CapabilityMetadata {
163 name: "aggregator".to_string(),
164 description: "Aggregates and processes data streams".to_string(),
165 version: "1.0.0".to_string(),
166 methods: vec![
167 MethodMetadata {
168 name: "add".to_string(),
169 description: "Add data to aggregation".to_string(),
170 parameters: vec![ParameterMetadata {
171 name: "data".to_string(),
172 type_name: "any".to_string(),
173 description: "Data to aggregate".to_string(),
174 required: true,
175 }],
176 return_type: "void".to_string(),
177 },
178 MethodMetadata {
179 name: "compute".to_string(),
180 description: "Compute aggregation result".to_string(),
181 parameters: vec![],
182 return_type: "AggregationResult".to_string(),
183 },
184 MethodMetadata {
185 name: "reset".to_string(),
186 description: "Reset aggregation state".to_string(),
187 parameters: vec![],
188 return_type: "void".to_string(),
189 },
190 ],
191 config_schema: None,
192 }),
193 "processor" => Some(CapabilityMetadata {
194 name: "processor".to_string(),
195 description: "Processes data through transformation pipeline".to_string(),
196 version: "1.0.0".to_string(),
197 methods: vec![MethodMetadata {
198 name: "process".to_string(),
199 description: "Process input data".to_string(),
200 parameters: vec![ParameterMetadata {
201 name: "input".to_string(),
202 type_name: "any".to_string(),
203 description: "Input data".to_string(),
204 required: true,
205 }],
206 return_type: "ProcessedData".to_string(),
207 }],
208 config_schema: None,
209 }),
210 "analyzer" => Some(CapabilityMetadata {
211 name: "analyzer".to_string(),
212 description: "Analyzes data patterns and metrics".to_string(),
213 version: "1.0.0".to_string(),
214 methods: vec![MethodMetadata {
215 name: "analyze".to_string(),
216 description: "Analyze data".to_string(),
217 parameters: vec![ParameterMetadata {
218 name: "data".to_string(),
219 type_name: "any".to_string(),
220 description: "Data to analyze".to_string(),
221 required: true,
222 }],
223 return_type: "AnalysisResult".to_string(),
224 }],
225 config_schema: None,
226 }),
227 "transformer" => Some(CapabilityMetadata {
228 name: "transformer".to_string(),
229 description: "Transforms data between formats".to_string(),
230 version: "1.0.0".to_string(),
231 methods: vec![MethodMetadata {
232 name: "transform".to_string(),
233 description: "Transform data".to_string(),
234 parameters: vec![
235 ParameterMetadata {
236 name: "input".to_string(),
237 type_name: "any".to_string(),
238 description: "Input data".to_string(),
239 required: true,
240 },
241 ParameterMetadata {
242 name: "format".to_string(),
243 type_name: "string".to_string(),
244 description: "Target format".to_string(),
245 required: false,
246 },
247 ],
248 return_type: "TransformedData".to_string(),
249 }],
250 config_schema: None,
251 }),
252 _ => None,
253 }
254 }
255}
256
257#[derive(Debug)]
259pub struct AdvancedCapability {
260 resume_manager: Arc<ResumeTokenManager>,
262 persistent_manager: Arc<PersistentSessionManager>,
263 sessions: Arc<RwLock<HashMap<String, SessionState>>>,
264
265 capability_graph: Arc<CapabilityGraph>,
267 capability_factory: Arc<SimpleCapabilityFactory>,
268 capability_counter: Arc<Mutex<u64>>,
269
270 plan_runner: Arc<PlanRunner>,
272 plan_cache: Arc<RwLock<HashMap<String, Plan>>>,
273
274 id_allocator: Arc<IdAllocator>,
276 import_table: Arc<ImportTable>,
277 export_table: Arc<ExportTable>,
278
279 call_count: Arc<Mutex<usize>>,
281 nested_capabilities: Arc<RwLock<HashMap<String, Arc<dyn RpcTarget>>>>,
282}
283
284#[derive(Debug, Clone)]
286pub struct AdvancedCapabilityConfig {
287 pub secret_key: Option<Vec<u8>>,
289 pub token_ttl: u64,
291 pub max_session_age: u64,
293 pub max_capabilities: usize,
295 pub max_plan_operations: usize,
297 pub plan_timeout_ms: u64,
299}
300
301impl Default for AdvancedCapabilityConfig {
302 fn default() -> Self {
303 Self {
304 secret_key: None,
305 token_ttl: 3600, max_session_age: 86400, max_capabilities: 1000,
308 max_plan_operations: 1000,
309 plan_timeout_ms: 30000, }
311 }
312}
313
314impl AdvancedCapability {
315 pub fn new() -> Self {
317 Self::with_config(AdvancedCapabilityConfig::default())
318 }
319
320 pub fn with_config(config: AdvancedCapabilityConfig) -> Self {
322 use rand::RngCore;
324 let secret_key = config.secret_key.unwrap_or_else(|| {
325 let mut key = vec![0u8; 32];
326 rand::rng().fill_bytes(&mut key);
327 key
328 });
329
330 let resume_manager = Arc::new(ResumeTokenManager::with_settings(
331 secret_key.clone(),
332 config.token_ttl,
333 config.max_session_age,
334 ));
335
336 let persistent_manager = Arc::new(PersistentSessionManager::new(
337 ResumeTokenManager::with_settings(secret_key, config.token_ttl, config.max_session_age),
338 ));
339
340 let id_allocator = Arc::new(IdAllocator::new());
342 let import_table = Arc::new(ImportTable::new(id_allocator.clone()));
343 let export_table = Arc::new(ExportTable::new(id_allocator.clone()));
344
345 let plan_runner = PlanRunner::with_settings(
347 import_table.clone(),
348 export_table.clone(),
349 config.plan_timeout_ms,
350 config.max_plan_operations,
351 );
352
353 Self {
354 resume_manager,
355 persistent_manager,
356 sessions: Arc::new(RwLock::new(HashMap::new())),
357
358 capability_graph: Arc::new(CapabilityGraph::new()),
359 capability_factory: Arc::new(SimpleCapabilityFactory::new(config.max_capabilities)),
360 capability_counter: Arc::new(Mutex::new(1)),
361
362 plan_runner: Arc::new(plan_runner),
363 plan_cache: Arc::new(RwLock::new(HashMap::new())),
364
365 id_allocator,
366 import_table,
367 export_table,
368
369 call_count: Arc::new(Mutex::new(0)),
370 nested_capabilities: Arc::new(RwLock::new(HashMap::new())),
371 }
372 }
373
374 fn parse_plan(&self, json: &JsonValue) -> Result<Plan, RpcError> {
376 let mut ops = Vec::new();
377 let mut captures = Vec::new();
378
379 if let Some(operations) = json.get("operations").and_then(|o| o.as_array()) {
380 for op_json in operations {
381 let operation = self.parse_operation(op_json)?;
382 ops.push(operation);
383 }
384 }
385
386 if let Some(capture_array) = json.get("captures").and_then(|c| c.as_array()) {
388 for cap_json in capture_array {
389 if let Some(cap_id) = cap_json.as_u64() {
390 captures.push(CapId::new(cap_id as u64));
391 }
392 }
393 }
394
395 let result = if let Some(result_json) = json.get("result") {
397 self.parse_source(Some(result_json))?
398 } else {
399 Source::Result {
400 result: ResultRef {
401 index: ops.len().saturating_sub(1) as u32,
402 },
403 }
404 };
405
406 Ok(Plan {
407 captures,
408 ops,
409 result,
410 })
411 }
412
413 fn parse_operation(&self, json: &JsonValue) -> Result<Op, RpcError> {
415 if let Some(call_obj) = json.get("call") {
417 let target = self.parse_source(call_obj.get("target"))?;
418 let member = call_obj
419 .get("member")
420 .and_then(|m| m.as_str())
421 .ok_or_else(|| RpcError::bad_request("Call missing member"))?
422 .to_string();
423 let args = self.parse_sources(call_obj.get("args"))?;
424 let result = call_obj
425 .get("result")
426 .and_then(|r| r.as_u64())
427 .ok_or_else(|| RpcError::bad_request("Call missing result index"))?
428 as u32;
429
430 Ok(Op::Call {
431 call: CallOp {
432 target,
433 member,
434 args,
435 result,
436 },
437 })
438 }
439 else if let Some(obj_obj) = json.get("object") {
441 let mut fields = std::collections::BTreeMap::new();
442 if let Some(fields_obj) = obj_obj.get("fields").and_then(|f| f.as_object()) {
443 for (key, val) in fields_obj {
444 fields.insert(key.clone(), self.parse_source(Some(val))?);
445 }
446 }
447 let result = obj_obj
448 .get("result")
449 .and_then(|r| r.as_u64())
450 .ok_or_else(|| RpcError::bad_request("Object missing result index"))?
451 as u32;
452
453 Ok(Op::Object {
454 object: ObjectOp { fields, result },
455 })
456 }
457 else if let Some(array_obj) = json.get("array") {
459 let items = self.parse_sources(array_obj.get("items"))?;
460 let result = array_obj
461 .get("result")
462 .and_then(|r| r.as_u64())
463 .ok_or_else(|| RpcError::bad_request("Array missing result index"))?
464 as u32;
465
466 Ok(Op::Array {
467 array: ArrayOp { items, result },
468 })
469 }
470 else if let Some(op_type) = json.get("type").and_then(|t| t.as_str()) {
472 match op_type {
473 "call" => {
474 let target = self.parse_source(json.get("target"))?;
475 let member = json
476 .get("member")
477 .and_then(|m| m.as_str())
478 .ok_or_else(|| RpcError::bad_request("Call missing member"))?
479 .to_string();
480 let args = self.parse_sources(json.get("args"))?;
481 let result = json.get("result").and_then(|r| r.as_u64()).unwrap_or(0) as u32;
482
483 Ok(Op::Call {
484 call: CallOp {
485 target,
486 member,
487 args,
488 result,
489 },
490 })
491 }
492 _ => Err(RpcError::bad_request(format!(
493 "Unknown operation type: {}",
494 op_type
495 ))),
496 }
497 } else {
498 Err(RpcError::bad_request(
499 "Operation must have 'call', 'object', or 'array' field",
500 ))
501 }
502 }
503
504 fn parse_sources(&self, json: Option<&JsonValue>) -> Result<Vec<Source>, RpcError> {
506 if let Some(array) = json.and_then(|j| j.as_array()) {
507 array.iter().map(|s| self.parse_source(Some(s))).collect()
508 } else {
509 Ok(vec![])
510 }
511 }
512
513 fn parse_source(&self, json: Option<&JsonValue>) -> Result<Source, RpcError> {
515 let json = json.ok_or_else(|| RpcError::bad_request("Missing source"))?;
516
517 if let Some(capture_obj) = json.get("capture") {
519 let index = capture_obj
520 .get("index")
521 .and_then(|i| i.as_u64())
522 .ok_or_else(|| RpcError::bad_request("Capture missing index"))?
523 as u32;
524 Ok(Source::Capture {
525 capture: CaptureRef { index },
526 })
527 }
528 else if let Some(result_obj) = json.get("result") {
530 let index = result_obj
531 .get("index")
532 .and_then(|i| i.as_u64())
533 .ok_or_else(|| RpcError::bad_request("Result missing index"))?
534 as u32;
535 Ok(Source::Result {
536 result: ResultRef { index },
537 })
538 }
539 else if let Some(param_obj) = json.get("param") {
541 let path = if let Some(path_array) = param_obj.get("path").and_then(|p| p.as_array()) {
542 path_array
543 .iter()
544 .filter_map(|v| v.as_str().map(String::from))
545 .collect()
546 } else {
547 vec![]
548 };
549 Ok(Source::Param {
550 param: ParamRef { path },
551 })
552 }
553 else if let Some(value_obj) = json.get("byValue") {
555 let value = value_obj
556 .get("value")
557 .ok_or_else(|| RpcError::bad_request("ByValue missing value"))?;
558 Ok(Source::ByValue {
559 by_value: ValueRef {
560 value: value.clone(),
561 },
562 })
563 }
564 else if let Some(source_type) = json.get("type").and_then(|t| t.as_str()) {
566 match source_type {
567 "capture" => {
568 let index = json.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as u32;
569 Ok(Source::Capture {
570 capture: CaptureRef { index },
571 })
572 }
573 "result" => {
574 let index = json.get("index").and_then(|i| i.as_u64()).unwrap_or(0) as u32;
575 Ok(Source::Result {
576 result: ResultRef { index },
577 })
578 }
579 "param" | "parameter" => {
580 let path = if let Some(path_str) = json.get("path").and_then(|p| p.as_str()) {
581 vec![path_str.to_string()]
582 } else if let Some(path_array) = json.get("path").and_then(|p| p.as_array()) {
583 path_array
584 .iter()
585 .filter_map(|v| v.as_str().map(String::from))
586 .collect()
587 } else {
588 vec![]
589 };
590 Ok(Source::Param {
591 param: ParamRef { path },
592 })
593 }
594 "literal" | "value" => {
595 let value = json
596 .get("value")
597 .ok_or_else(|| RpcError::bad_request("Literal source missing value"))?;
598 Ok(Source::ByValue {
599 by_value: ValueRef {
600 value: value.clone(),
601 },
602 })
603 }
604 _ => Err(RpcError::bad_request(format!(
605 "Unknown source type: {}",
606 source_type
607 ))),
608 }
609 } else {
610 Ok(Source::ByValue {
612 by_value: ValueRef {
613 value: json.clone(),
614 },
615 })
616 }
617 }
618
619 #[allow(clippy::only_used_in_recursion)]
621 fn json_to_value(&self, json: &JsonValue) -> Result<Value, RpcError> {
622 match json {
623 JsonValue::Null => Ok(Value::Null),
624 JsonValue::Bool(b) => Ok(Value::Bool(*b)),
625 JsonValue::Number(n) => Ok(Value::Number(n.clone())),
626 JsonValue::String(s) => Ok(Value::String(s.clone())),
627 JsonValue::Array(arr) => {
628 let values = arr
629 .iter()
630 .map(|v| self.json_to_value(v))
631 .collect::<Result<Vec<_>, _>>()?;
632 Ok(Value::Array(values))
633 }
634 JsonValue::Object(obj) => {
635 let mut map = std::collections::HashMap::new();
636 for (key, val) in obj.iter() {
637 map.insert(key.clone(), Box::new(self.json_to_value(val)?));
638 }
639 Ok(Value::Object(map))
640 }
641 }
642 }
643
644 #[allow(clippy::only_used_in_recursion)]
646 fn value_to_json(&self, value: &Value) -> JsonValue {
647 match value {
648 Value::Null => JsonValue::Null,
649 Value::Bool(b) => JsonValue::Bool(*b),
650 Value::Number(n) => JsonValue::Number(n.clone()),
651 Value::String(s) => JsonValue::String(s.clone()),
652 Value::Array(arr) => {
653 JsonValue::Array(arr.iter().map(|v| self.value_to_json(v)).collect())
654 }
655 Value::Object(obj) => {
656 let mut json_obj = serde_json::Map::new();
657 for (key, val) in obj.iter() {
658 json_obj.insert(key.clone(), self.value_to_json(val));
659 }
660 JsonValue::Object(json_obj)
661 }
662 Value::Date(timestamp) => json!(timestamp),
663 Value::Error {
664 error_type,
665 message,
666 stack,
667 } => json!({
668 "error": error_type,
669 "message": message,
670 "stack": stack
671 }),
672 Value::Stub(_) => JsonValue::String("__stub__".to_string()),
673 Value::Promise(_) => JsonValue::String("__promise__".to_string()),
674 }
675 }
676}
677
678#[derive(Clone, Debug)]
680struct NestedCapabilityImpl {
681 name: String,
682 config: Value,
683 parent: Option<CapId>,
684}
685
686#[async_trait]
687impl RpcTarget for NestedCapabilityImpl {
688 async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
689 match method {
690 "getName" => Ok(Value::String(self.name.clone())),
691 "getConfig" => Ok(self.config.clone()),
692 "process" => {
693 if let Some(input) = args.first() {
695 Ok(Value::String(format!(
696 "{} processed: {:?}",
697 self.name, input
698 )))
699 } else {
700 Err(RpcError::bad_request("Process requires input"))
701 }
702 }
703 "validate" => {
704 Ok(Value::Bool(true))
706 }
707 _ => Err(RpcError::not_found(format!(
708 "Method {} not found on nested capability",
709 method
710 ))),
711 }
712 }
713
714 async fn get_property(&self, property: &str) -> Result<Value, RpcError> {
715 match property {
716 "name" => Ok(Value::String(self.name.clone())),
717 "config" => Ok(self.config.clone()),
718 "parent" => Ok(match &self.parent {
719 Some(id) => Value::String(id.as_u64().to_string()),
720 None => Value::Null,
721 }),
722 _ => Err(RpcError::not_found(format!(
723 "Property {} not found",
724 property
725 ))),
726 }
727 }
728}
729
730#[async_trait]
731impl RpcTarget for AdvancedCapability {
732 async fn call(&self, method: &str, args: Vec<Value>) -> Result<Value, RpcError> {
733 {
735 let mut count = self.call_count.lock().await;
736 *count += 1;
737 }
738
739 match method {
740 "createResumeToken" => {
744 let config = args
746 .first()
747 .and_then(|v| match v {
748 Value::Object(obj) => Some(obj),
749 _ => None,
750 })
751 .ok_or_else(|| {
752 RpcError::bad_request("createResumeToken requires config object")
753 })?;
754
755 let session_id = config
756 .get("sessionId")
757 .and_then(|v| match v.as_ref() {
758 Value::String(s) => Some(s.as_str()),
759 _ => None,
760 })
761 .ok_or_else(|| RpcError::bad_request("Missing sessionId"))?;
762
763 let include_state = config
764 .get("includeState")
765 .and_then(|v| match v.as_ref() {
766 Value::Bool(b) => Some(*b),
767 _ => None,
768 })
769 .unwrap_or(true);
770
771 let expiration_minutes = config
772 .get("expirationMinutes")
773 .and_then(|v| match v.as_ref() {
774 Value::Number(n) => n.as_u64(),
775 _ => None,
776 })
777 .unwrap_or(60);
778
779 let snapshot = self
781 .resume_manager
782 .create_snapshot(
783 session_id.to_string(),
784 &self.id_allocator,
785 &self.import_table,
786 &self.export_table,
787 None, )
789 .await
790 .map_err(|e| {
791 RpcError::internal(format!("Failed to create snapshot: {:?}", e))
792 })?;
793
794 if include_state {
796 let mut sessions = self.sessions.write().await;
797 let session_state =
798 sessions
799 .entry(session_id.to_string())
800 .or_insert_with(|| SessionState {
801 variables: HashMap::new(),
802 operations: Vec::new(),
803 last_result: None,
804 created_at: Utc::now().timestamp(),
805 last_accessed: Utc::now().timestamp(),
806 });
807 session_state.last_accessed = Utc::now().timestamp();
808 }
809
810 let token = self
812 .resume_manager
813 .generate_token(snapshot.clone())
814 .map_err(|e| {
815 RpcError::internal(format!("Failed to generate token: {:?}", e))
816 })?;
817
818 let response_json = json!({
821 "token": token,
822 "sessionId": session_id,
823 "expiresAt": Utc::now().timestamp() + (expiration_minutes as i64 * 60),
824 "includesState": include_state
825 });
826 self.json_to_value(&response_json)
827 }
828
829 "restoreSession" => {
830 let config = if let Some(Value::Object(obj)) = args.first() {
831 obj
832 } else {
833 return Err(RpcError::bad_request(
834 "restoreSession requires config object",
835 ));
836 };
837
838 let token = if let Some(val) = config.get("token") {
839 if let Value::String(s) = &**val {
840 s.as_str()
841 } else {
842 return Err(RpcError::bad_request("Token must be a string"));
843 }
844 } else {
845 return Err(RpcError::bad_request("Missing token"));
846 };
847
848 let resume_token: ResumeToken = serde_json::from_str(token)
850 .map_err(|e| RpcError::bad_request(format!("Invalid token format: {}", e)))?;
851
852 let snapshot = self
854 .resume_manager
855 .parse_token(&resume_token)
856 .map_err(|e| RpcError::bad_request(format!("Invalid token: {:?}", e)))?;
857
858 let session_id = format!("restored_{}", Utc::now().timestamp());
860 let mut sessions = self.sessions.write().await;
861 sessions.insert(
862 session_id.clone(),
863 SessionState {
864 variables: HashMap::new(),
865 operations: Vec::new(),
866 last_result: None,
867 created_at: snapshot.created_at as i64,
868 last_accessed: Utc::now().timestamp(),
869 },
870 );
871
872 let response = json!({
873 "sessionId": session_id,
874 "restored": true,
875 "createdAt": snapshot.created_at,
876 "version": snapshot.version
877 });
878 self.json_to_value(&response)
879 }
880
881 "setVariable" => {
882 let var_name = if let Some(Value::String(s)) = args.first() {
883 s.as_str()
884 } else {
885 return Err(RpcError::bad_request("setVariable requires variable name"));
886 };
887
888 let var_value = args
889 .get(1)
890 .ok_or_else(|| RpcError::bad_request("setVariable requires value"))?;
891
892 let mut sessions = self.sessions.write().await;
894 let session =
895 sessions
896 .entry("default".to_string())
897 .or_insert_with(|| SessionState {
898 variables: HashMap::new(),
899 operations: Vec::new(),
900 last_result: None,
901 created_at: Utc::now().timestamp(),
902 last_accessed: Utc::now().timestamp(),
903 });
904
905 session
906 .variables
907 .insert(var_name.to_string(), var_value.clone());
908 session.last_accessed = Utc::now().timestamp();
909
910 Ok(Value::Bool(true))
911 }
912
913 "getVariable" => {
914 let var_name = if let Some(Value::String(s)) = args.first() {
915 s.as_str()
916 } else {
917 return Err(RpcError::bad_request("getVariable requires variable name"));
918 };
919
920 let sessions = self.sessions.read().await;
921 let session = sessions
922 .get("default")
923 .ok_or_else(|| RpcError::not_found("No active session"))?;
924
925 session
926 .variables
927 .get(var_name)
928 .cloned()
929 .ok_or_else(|| RpcError::not_found(format!("Variable {} not found", var_name)))
930 }
931
932 "createSubCapability" => {
936 let cap_type = if let Some(Value::String(s)) = args.first() {
937 s.as_str()
938 } else {
939 return Err(RpcError::bad_request("createSubCapability requires type"));
940 };
941
942 let config = args
943 .get(1)
944 .cloned()
945 .unwrap_or(Value::Object(std::collections::HashMap::new()));
946
947 let mut counter = self.capability_counter.lock().await;
949 let cap_id = CapId::new(*counter);
950 let cap_name = format!("{}-{}", cap_type, *counter);
951 *counter += 1;
952
953 let nested_cap = Arc::new(NestedCapabilityImpl {
955 name: cap_type.to_string(),
956 config: config.clone(),
957 parent: None, });
959
960 use capnweb_core::protocol::nested_capabilities::CapabilityNode;
962 let node = CapabilityNode {
963 id: cap_name.clone(),
964 capability_type: cap_type.to_string(),
965 parent_id: None,
966 created_at: chrono::Utc::now().timestamp() as u64,
967 config: config.clone(),
968 metadata: self
969 .capability_factory
970 .get_capability_metadata(cap_type)
971 .unwrap_or(CapabilityMetadata {
972 name: cap_type.to_string(),
973 description: format!("{} capability", cap_type),
974 version: "1.0.0".to_string(),
975 methods: vec![],
976 config_schema: None,
977 }),
978 };
979 self.capability_graph
980 .add_capability(node)
981 .await
982 .map_err(|e| {
983 RpcError::internal(format!("Failed to add capability: {:?}", e))
984 })?;
985
986 let mut capabilities = self.nested_capabilities.write().await;
988 capabilities.insert(cap_name.clone(), nested_cap);
989
990 let response = json!({
991 "capabilityId": cap_id.as_u64(),
992 "type": cap_type,
993 "name": cap_name,
994 "config": self.value_to_json(&config)
995 });
996 self.json_to_value(&response)
997 }
998
999 "callSubCapability" => {
1000 let cap_name = if let Some(Value::String(s)) = args.first() {
1001 s.as_str()
1002 } else {
1003 return Err(RpcError::bad_request(
1004 "callSubCapability requires capability name",
1005 ));
1006 };
1007
1008 let sub_method = if let Some(Value::String(s)) = args.get(1) {
1009 s.as_str()
1010 } else {
1011 return Err(RpcError::bad_request(
1012 "callSubCapability requires method name",
1013 ));
1014 };
1015
1016 let sub_args = args.get(2..).map(|a| a.to_vec()).unwrap_or_else(Vec::new);
1017
1018 let capabilities = self.nested_capabilities.read().await;
1020 let capability = capabilities.get(cap_name).ok_or_else(|| {
1021 RpcError::not_found(format!("Capability {} not found", cap_name))
1022 })?;
1023
1024 capability.call(sub_method, sub_args).await
1025 }
1026
1027 "disposeSubCapability" => {
1028 let cap_name = if let Some(Value::String(s)) = args.first() {
1029 s.as_str()
1030 } else {
1031 return Err(RpcError::bad_request(
1032 "disposeSubCapability requires capability name",
1033 ));
1034 };
1035
1036 let mut capabilities = self.nested_capabilities.write().await;
1038 capabilities.remove(cap_name).ok_or_else(|| {
1039 RpcError::not_found(format!("Capability {} not found", cap_name))
1040 })?;
1041
1042 Ok(Value::Bool(true))
1043 }
1044
1045 "listSubCapabilities" => {
1046 let capabilities = self.nested_capabilities.read().await;
1047 let cap_list: Vec<String> = capabilities.keys().cloned().collect();
1048
1049 Ok(Value::Array(
1050 cap_list.into_iter().map(Value::String).collect(),
1051 ))
1052 }
1053
1054 "executePlan" => {
1058 let plan_json = args
1059 .first()
1060 .ok_or_else(|| RpcError::bad_request("executePlan requires plan"))?;
1061
1062 let parameters = args
1063 .get(1)
1064 .cloned()
1065 .unwrap_or(Value::Object(std::collections::HashMap::new()));
1066
1067 let json_value = self.value_to_json(plan_json);
1069 let plan = self.parse_plan(&json_value)?;
1070
1071 let captures = if let Some(Value::Array(cap_array)) = args.get(2) {
1073 let mut captured_caps = Vec::new();
1074 for cap_ref in cap_array {
1075 if let Value::String(cap_name) = cap_ref {
1076 let capabilities = self.nested_capabilities.read().await;
1077 if let Some(cap) = capabilities.get(cap_name) {
1078 captured_caps.push(cap.clone() as Arc<dyn RpcTarget>);
1079 }
1080 }
1081 }
1082 captured_caps
1083 } else {
1084 vec![Arc::new(self.clone()) as Arc<dyn RpcTarget>]
1085 };
1086
1087 let result = self
1089 .plan_runner
1090 .execute_plan(&plan, parameters, captures)
1091 .await
1092 .map_err(|e| RpcError::internal(format!("Plan execution failed: {:?}", e)))?;
1093
1094 {
1096 let mut sessions = self.sessions.write().await;
1097 let session =
1098 sessions
1099 .entry("default".to_string())
1100 .or_insert_with(|| SessionState {
1101 variables: HashMap::new(),
1102 operations: Vec::new(),
1103 last_result: None,
1104 created_at: Utc::now().timestamp(),
1105 last_accessed: Utc::now().timestamp(),
1106 });
1107 session.last_result = Some(result.clone());
1108 session.operations.push("executePlan".to_string());
1109 }
1110
1111 Ok(result)
1112 }
1113
1114 "createPlan" => {
1115 let plan_name = if let Some(Value::String(s)) = args.first() {
1116 s.as_str()
1117 } else {
1118 return Err(RpcError::bad_request("createPlan requires name"));
1119 };
1120
1121 let operations = args
1122 .get(1)
1123 .ok_or_else(|| RpcError::bad_request("createPlan requires operations"))?;
1124
1125 let json_ops = self.value_to_json(operations);
1127 let plan = self.parse_plan(&json!({
1128 "operations": json_ops
1129 }))?;
1130
1131 let mut cache = self.plan_cache.write().await;
1132 cache.insert(plan_name.to_string(), plan);
1133
1134 let response = json!({
1135 "planName": plan_name,
1136 "cached": true
1137 });
1138 self.json_to_value(&response)
1139 }
1140
1141 "executeCachedPlan" => {
1142 let plan_name = if let Some(Value::String(s)) = args.first() {
1143 s.as_str()
1144 } else {
1145 return Err(RpcError::bad_request(
1146 "executeCachedPlan requires plan name",
1147 ));
1148 };
1149
1150 let parameters = args
1151 .get(1)
1152 .cloned()
1153 .unwrap_or(Value::Object(std::collections::HashMap::new()));
1154
1155 let cache = self.plan_cache.read().await;
1157 let plan = cache
1158 .get(plan_name)
1159 .ok_or_else(|| RpcError::not_found(format!("Plan {} not found", plan_name)))?
1160 .clone();
1161
1162 let result = self
1164 .plan_runner
1165 .execute_plan(&plan, parameters, vec![])
1166 .await
1167 .map_err(|e| RpcError::internal(format!("Plan execution failed: {:?}", e)))?;
1168
1169 Ok(result)
1170 }
1171
1172 "add" => {
1176 if args.len() != 2 {
1177 return Err(RpcError::bad_request("add requires 2 arguments"));
1178 }
1179
1180 let a = if let Value::Number(n) = &args[0] {
1181 n.as_f64()
1182 .ok_or_else(|| RpcError::bad_request("Invalid number"))?
1183 } else {
1184 return Err(RpcError::bad_request("First argument must be a number"));
1185 };
1186 let b = if let Value::Number(n) = &args[1] {
1187 n.as_f64()
1188 .ok_or_else(|| RpcError::bad_request("Invalid number"))?
1189 } else {
1190 return Err(RpcError::bad_request("Second argument must be a number"));
1191 };
1192
1193 let result = Value::Number(
1194 serde_json::Number::from_f64(a + b)
1195 .ok_or_else(|| RpcError::internal("Invalid number result"))?,
1196 );
1197
1198 {
1200 let mut sessions = self.sessions.write().await;
1201 let session =
1202 sessions
1203 .entry("default".to_string())
1204 .or_insert_with(|| SessionState {
1205 variables: HashMap::new(),
1206 operations: Vec::new(),
1207 last_result: None,
1208 created_at: Utc::now().timestamp(),
1209 last_accessed: Utc::now().timestamp(),
1210 });
1211 session.last_result = Some(result.clone());
1212 session.operations.push("add".to_string());
1213 }
1214
1215 Ok(result)
1216 }
1217
1218 "multiply" => {
1219 if args.len() != 2 {
1220 return Err(RpcError::bad_request("multiply requires 2 arguments"));
1221 }
1222
1223 let a = if let Value::Number(n) = &args[0] {
1224 n.as_f64()
1225 .ok_or_else(|| RpcError::bad_request("Invalid number"))?
1226 } else {
1227 return Err(RpcError::bad_request("First argument must be a number"));
1228 };
1229 let b = if let Value::Number(n) = &args[1] {
1230 n.as_f64()
1231 .ok_or_else(|| RpcError::bad_request("Invalid number"))?
1232 } else {
1233 return Err(RpcError::bad_request("Second argument must be a number"));
1234 };
1235
1236 let result = Value::Number(
1237 serde_json::Number::from_f64(a * b)
1238 .ok_or_else(|| RpcError::internal("Invalid number result"))?,
1239 );
1240
1241 {
1243 let mut sessions = self.sessions.write().await;
1244 let session =
1245 sessions
1246 .entry("default".to_string())
1247 .or_insert_with(|| SessionState {
1248 variables: HashMap::new(),
1249 operations: Vec::new(),
1250 last_result: None,
1251 created_at: Utc::now().timestamp(),
1252 last_accessed: Utc::now().timestamp(),
1253 });
1254 session.last_result = Some(result.clone());
1255 session.operations.push("multiply".to_string());
1256 }
1257
1258 Ok(result)
1259 }
1260
1261 "getStats" => {
1262 let count = *self.call_count.lock().await;
1263 let sessions = self.sessions.read().await;
1264 let capabilities = self.nested_capabilities.read().await;
1265 let plans = self.plan_cache.read().await;
1266
1267 let response = json!({
1268 "totalCalls": count,
1269 "activeSessions": sessions.len(),
1270 "nestedCapabilities": capabilities.len(),
1271 "cachedPlans": plans.len()
1272 });
1273 self.json_to_value(&response)
1274 }
1275
1276 _ => Err(RpcError::not_found(format!("Method {} not found", method))),
1277 }
1278 }
1279
1280 async fn get_property(&self, property: &str) -> Result<Value, RpcError> {
1281 match property {
1282 "total_calls" => {
1283 let count = *self.call_count.lock().await;
1284 Ok(Value::Number(serde_json::Number::from(count)))
1285 }
1286 "session_count" => {
1287 let sessions = self.sessions.read().await;
1288 Ok(Value::Number(serde_json::Number::from(sessions.len())))
1289 }
1290 "capability_count" => {
1291 let capabilities = self.nested_capabilities.read().await;
1292 Ok(Value::Number(serde_json::Number::from(capabilities.len())))
1293 }
1294 "cached_plans" => {
1295 let plans = self.plan_cache.read().await;
1296 Ok(Value::Number(serde_json::Number::from(plans.len())))
1297 }
1298 _ => Err(RpcError::not_found(format!(
1299 "Property {} not found",
1300 property
1301 ))),
1302 }
1303 }
1304}
1305
1306impl Clone for AdvancedCapability {
1308 fn clone(&self) -> Self {
1309 Self {
1310 resume_manager: self.resume_manager.clone(),
1311 persistent_manager: self.persistent_manager.clone(),
1312 sessions: self.sessions.clone(),
1313 capability_graph: self.capability_graph.clone(),
1314 capability_factory: self.capability_factory.clone(),
1315 capability_counter: self.capability_counter.clone(),
1316 plan_runner: self.plan_runner.clone(),
1317 plan_cache: self.plan_cache.clone(),
1318 id_allocator: self.id_allocator.clone(),
1319 import_table: self.import_table.clone(),
1320 export_table: self.export_table.clone(),
1321 call_count: self.call_count.clone(),
1322 nested_capabilities: self.nested_capabilities.clone(),
1323 }
1324 }
1325}
1326
1327pub struct AdvancedCapabilityBuilder {
1329 config: AdvancedCapabilityConfig,
1330}
1331
1332impl AdvancedCapabilityBuilder {
1333 pub fn new() -> Self {
1335 Self {
1336 config: AdvancedCapabilityConfig::default(),
1337 }
1338 }
1339
1340 pub fn with_secret_key(mut self, key: Vec<u8>) -> Self {
1342 self.config.secret_key = Some(key);
1343 self
1344 }
1345
1346 pub fn with_token_ttl(mut self, ttl: u64) -> Self {
1348 self.config.token_ttl = ttl;
1349 self
1350 }
1351
1352 pub fn with_max_session_age(mut self, age: u64) -> Self {
1354 self.config.max_session_age = age;
1355 self
1356 }
1357
1358 pub fn with_max_capabilities(mut self, max: usize) -> Self {
1360 self.config.max_capabilities = max;
1361 self
1362 }
1363
1364 pub fn with_max_plan_operations(mut self, max: usize) -> Self {
1366 self.config.max_plan_operations = max;
1367 self
1368 }
1369
1370 pub fn with_plan_timeout(mut self, timeout_ms: u64) -> Self {
1372 self.config.plan_timeout_ms = timeout_ms;
1373 self
1374 }
1375
1376 pub fn build(self) -> AdvancedCapability {
1378 AdvancedCapability::with_config(self.config)
1379 }
1380}
1381
1382impl Default for AdvancedCapabilityBuilder {
1383 fn default() -> Self {
1384 Self::new()
1385 }
1386}
1387
1388#[cfg(test)]
1389mod tests {
1390 use super::*;
1391
1392 #[tokio::test]
1393 async fn test_advanced_capability_creation() {
1394 let cap = AdvancedCapability::new();
1395
1396 let result = cap.call("getStats", vec![]).await;
1398 assert!(result.is_ok());
1399
1400 let stats = result.unwrap();
1401 if let Value::Object(obj) = stats {
1402 assert!(obj.contains_key("totalCalls"));
1403 assert!(obj.contains_key("activeSessions"));
1404 } else {
1405 panic!("Expected object result");
1406 }
1407 }
1408
1409 #[tokio::test]
1410 async fn test_resume_token_methods() {
1411 let cap = AdvancedCapability::new();
1412
1413 let mut config_map = std::collections::HashMap::new();
1415 config_map.insert(
1416 "sessionId".to_string(),
1417 Box::new(Value::String("test123".to_string())),
1418 );
1419 config_map.insert("includeState".to_string(), Box::new(Value::Bool(true)));
1420 config_map.insert(
1421 "expirationMinutes".to_string(),
1422 Box::new(Value::Number(serde_json::Number::from(60))),
1423 );
1424 let config = Value::Object(config_map);
1425
1426 let result = cap.call("createResumeToken", vec![config]).await;
1427 assert!(result.is_ok());
1428
1429 let token_response = result.unwrap();
1430 if let Value::Object(obj) = token_response {
1431 assert!(obj.contains_key("token"));
1432 assert!(obj.contains_key("sessionId"));
1433 assert!(obj.contains_key("expiresAt"));
1434 } else {
1435 panic!("Expected object result");
1436 }
1437 }
1438
1439 #[tokio::test]
1440 async fn test_nested_capability_methods() {
1441 let cap = AdvancedCapability::new();
1442
1443 let mut config_map = std::collections::HashMap::new();
1445 config_map.insert(
1446 "maxLength".to_string(),
1447 Box::new(Value::Number(serde_json::Number::from(100))),
1448 );
1449
1450 let result = cap
1451 .call(
1452 "createSubCapability",
1453 vec![
1454 Value::String("validator".to_string()),
1455 Value::Object(config_map),
1456 ],
1457 )
1458 .await;
1459
1460 assert!(result.is_ok());
1461
1462 let cap_response = result.unwrap();
1463 if let Value::Object(obj) = cap_response {
1464 assert!(obj.contains_key("capabilityId"));
1465 assert!(obj.contains_key("type"));
1466 assert!(obj.contains_key("name"));
1467 } else {
1468 panic!("Expected object result");
1469 }
1470
1471 let list_result = cap.call("listSubCapabilities", vec![]).await;
1473 assert!(list_result.is_ok());
1474
1475 if let Value::Array(arr) = list_result.unwrap() {
1476 assert!(!arr.is_empty());
1477 } else {
1478 panic!("Expected array result");
1479 }
1480 }
1481
1482 #[tokio::test]
1483 async fn test_il_plan_execution() {
1484 let cap = AdvancedCapability::new();
1485
1486 let json_obj = json!({
1488 "operations": []
1489 });
1490
1491 fn json_value_to_core_value(json_val: serde_json::Value) -> Value {
1493 match json_val {
1494 serde_json::Value::Null => Value::Null,
1495 serde_json::Value::Bool(b) => Value::Bool(b),
1496 serde_json::Value::Number(n) => Value::Number(n),
1497 serde_json::Value::String(s) => Value::String(s),
1498 serde_json::Value::Array(arr) => {
1499 Value::Array(arr.into_iter().map(json_value_to_core_value).collect())
1500 }
1501 serde_json::Value::Object(obj) => {
1502 let mut map = std::collections::HashMap::new();
1503 for (k, v) in obj {
1504 map.insert(k, Box::new(json_value_to_core_value(v)));
1505 }
1506 Value::Object(map)
1507 }
1508 }
1509 }
1510
1511 let plan = json_value_to_core_value(json_obj);
1512
1513 let result = cap.call("executePlan", vec![plan, Value::Null]).await;
1514 assert!(result.is_ok() || result.is_err()); }
1518
1519 #[tokio::test]
1520 async fn test_calculator_compatibility() {
1521 let cap = AdvancedCapability::new();
1522
1523 let result = cap
1525 .call(
1526 "add",
1527 vec![
1528 Value::Number(serde_json::Number::from(10)),
1529 Value::Number(serde_json::Number::from(20)),
1530 ],
1531 )
1532 .await;
1533 assert!(result.is_ok());
1534 assert_eq!(
1535 result.unwrap(),
1536 Value::Number(serde_json::Number::from_f64(30.0).unwrap())
1537 );
1538
1539 let result = cap
1541 .call(
1542 "multiply",
1543 vec![
1544 Value::Number(serde_json::Number::from(5)),
1545 Value::Number(serde_json::Number::from(6)),
1546 ],
1547 )
1548 .await;
1549 assert!(result.is_ok());
1550 assert_eq!(
1551 result.unwrap(),
1552 Value::Number(serde_json::Number::from_f64(30.0).unwrap())
1553 );
1554 }
1555}