1use std::pin::Pin;
13use std::sync::Arc;
14
15use async_trait::async_trait;
16use tracing::{debug, warn};
17
18use apcore::context::Context;
19use apcore::errors::ModuleError;
20use apcore::Registry;
21use apcore::{ChunkStream, StreamingModule};
22
23use crate::output::types::{Verifier, WriteResult};
24use crate::output::verifiers::{run_verifier_chain, RegistryVerifier};
25use crate::types::ScannedModule;
26
27pub type StreamHandlerFn =
34 Arc<dyn Fn(serde_json::Value, &Context<serde_json::Value>) -> ChunkStream + Send + Sync>;
35
36pub type StreamingHandlerFactory = Arc<dyn Fn(&str) -> Option<StreamHandlerFn> + Send + Sync>;
45
46pub type HandlerFn = Arc<
48 dyn for<'a> Fn(
49 serde_json::Value,
50 &'a Context<serde_json::Value>,
51 ) -> Pin<
52 Box<
53 dyn std::future::Future<Output = Result<serde_json::Value, ModuleError>>
54 + Send
55 + 'a,
56 >,
57 > + Send
58 + Sync,
59>;
60
61pub type HandlerFactory = Arc<dyn Fn(&str) -> Option<HandlerFn> + Send + Sync>;
78
79struct StreamingFunctionModule {
85 inner: apcore::decorator::FunctionModule,
86 stream_fn: StreamHandlerFn,
87}
88
89#[async_trait]
90impl apcore::module::Module for StreamingFunctionModule {
91 fn input_schema(&self) -> serde_json::Value {
92 self.inner.input_schema()
93 }
94 fn output_schema(&self) -> serde_json::Value {
95 self.inner.output_schema()
96 }
97 fn description(&self) -> &str {
98 self.inner.description()
99 }
100 async fn execute(
101 &self,
102 inputs: serde_json::Value,
103 ctx: &Context<serde_json::Value>,
104 ) -> Result<serde_json::Value, ModuleError> {
105 self.inner.execute(inputs, ctx).await
106 }
107 fn stream(
108 &self,
109 inputs: serde_json::Value,
110 ctx: &Context<serde_json::Value>,
111 ) -> Option<ChunkStream> {
112 Some((self.stream_fn)(inputs, ctx))
113 }
114 fn as_streaming(&self) -> Option<&dyn StreamingModule> {
115 Some(self)
116 }
117}
118
119impl StreamingModule for StreamingFunctionModule {
120 fn stream_typed(
121 &self,
122 inputs: serde_json::Value,
123 ctx: &Context<serde_json::Value>,
124 ) -> ChunkStream {
125 (self.stream_fn)(inputs, ctx)
126 }
127}
128
129pub struct RegistryWriter {
143 handler_factory: Option<HandlerFactory>,
144 streaming_handler_factory: Option<StreamingHandlerFactory>,
151 allowed_prefixes: Option<Vec<String>>,
157}
158
159impl Default for RegistryWriter {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl RegistryWriter {
166 pub fn new() -> Self {
185 Self {
186 handler_factory: None,
187 streaming_handler_factory: None,
188 allowed_prefixes: None,
189 }
190 }
191
192 pub fn with_handler_factory(factory: HandlerFactory) -> Self {
194 Self {
195 handler_factory: Some(factory),
196 streaming_handler_factory: None,
197 allowed_prefixes: None,
198 }
199 }
200
201 pub fn with_streaming_handler_factory(mut self, factory: StreamingHandlerFactory) -> Self {
225 self.streaming_handler_factory = Some(factory);
226 self
227 }
228
229 pub fn with_allowed_prefixes(mut self, prefixes: Vec<String>) -> Self {
238 self.allowed_prefixes = Some(prefixes);
239 self
240 }
241
242 fn target_allowed(&self, target: &str) -> bool {
250 match self.allowed_prefixes.as_ref() {
251 None => true,
252 Some(prefixes) => {
253 let module_path = target.split(':').next().unwrap_or(target);
254 prefixes
255 .iter()
256 .any(|p| module_path_matches_prefix(module_path, p))
257 }
258 }
259 }
260}
261
262fn module_path_matches_prefix(module_path: &str, prefix: &str) -> bool {
269 let normalized = prefix.trim_end_matches('.');
270 if normalized.is_empty() {
271 return false;
272 }
273 if module_path == normalized {
274 return true;
275 }
276 let mut boundary = String::with_capacity(normalized.len() + 1);
277 boundary.push_str(normalized);
278 boundary.push('.');
279 module_path.starts_with(&boundary)
280}
281
282impl RegistryWriter {
283 pub fn write(
298 &self,
299 modules: &[ScannedModule],
300 registry: &mut Registry,
301 dry_run: bool,
302 verify: bool,
303 verifiers: Option<&[&dyn Verifier]>,
304 ) -> Vec<WriteResult> {
305 let mut results: Vec<WriteResult> = Vec::new();
306
307 for module in modules {
308 if dry_run {
309 results.push(WriteResult::new(module.module_id.clone()));
310 continue;
311 }
312
313 if !self.target_allowed(&module.target) {
314 warn!(
315 module_id = %module.module_id,
316 target = %module.target,
317 "RegistryWriter: target rejected by allowed_prefixes"
318 );
319 results.push(WriteResult::failed(
320 module.module_id.clone(),
321 None,
322 format!(
323 "target '{}' is not in allowed_prefixes — registration refused",
324 module.target
325 ),
326 ));
327 continue;
328 }
329
330 let (module_obj, descriptor) = self.to_module(module);
331 if let Err(e) = registry.register(&module.module_id, module_obj, descriptor) {
335 warn!(
336 module_id = %module.module_id,
337 error = %e,
338 "RegistryWriter registration failed"
339 );
340 results.push(WriteResult::failed(
341 module.module_id.clone(),
342 None,
343 format!("Registration failed: {e}"),
344 ));
345 continue;
346 }
347 debug!("Registered module: {}", module.module_id);
348
349 let mut result = WriteResult::new(module.module_id.clone());
350 if verify {
351 result = verify_registry(&result, &module.module_id, registry);
352 }
353 if let Some(vs) = verifiers {
359 let chain_result = run_verifier_chain(vs, "", &module.module_id);
360 if !chain_result.ok {
361 result = WriteResult::failed(
362 result.module_id,
363 result.path,
364 chain_result.error.unwrap_or_default(),
365 );
366 }
367 }
368 results.push(result);
369 }
370
371 results
372 }
373}
374
375impl RegistryWriter {
376 fn to_module(
388 &self,
389 module: &ScannedModule,
390 ) -> (
391 Box<dyn apcore::module::Module + Send + Sync>,
392 apcore::registry::registry::ModuleDescriptor,
393 ) {
394 let mut annotations = module.annotations.clone().unwrap_or_default();
395 let input_schema = module.input_schema.clone();
396 let output_schema = module.output_schema.clone();
397
398 let exec_handler: HandlerFn = if let Some(factory) = &self.handler_factory {
400 if let Some(handler) = factory(&module.target) {
401 handler
402 } else {
403 Self::passthrough_handler()
404 }
405 } else {
406 debug!(
407 module_id = %module.module_id,
408 "RegistryWriter using passthrough handler (no HandlerFactory configured)",
409 );
410 Self::passthrough_handler()
411 };
412
413 if annotations.streaming {
415 if let Some(stream_fn) = self
416 .streaming_handler_factory
417 .as_ref()
418 .and_then(|f| f(&module.target))
419 {
420 let inner = apcore::decorator::FunctionModule::with_description(
422 annotations.clone(),
423 input_schema.clone(),
424 output_schema.clone(),
425 module.description.clone(),
426 module.documentation.clone(),
427 module.tags.clone(),
428 module.version.clone(),
429 module.metadata.clone(),
430 module.examples.clone(),
431 move |inputs, ctx| exec_handler(inputs, ctx),
432 );
433 let descriptor = Self::make_descriptor(module, &annotations);
434 return (
435 Box::new(StreamingFunctionModule { inner, stream_fn }),
436 descriptor,
437 );
438 }
439
440 warn!(
442 module_id = %module.module_id,
443 target = %module.target,
444 "RegistryWriter: module declares annotations.streaming=true but no \
445 StreamingHandlerFactory provided a handler for this target; clearing \
446 streaming flag to avoid StreamingInterfaceMismatch at registration. \
447 Attach a StreamingHandlerFactory via with_streaming_handler_factory().",
448 );
449 annotations.streaming = false;
450 }
451
452 let fm = apcore::decorator::FunctionModule::with_description(
453 annotations.clone(),
454 input_schema,
455 output_schema,
456 module.description.clone(),
457 module.documentation.clone(),
458 module.tags.clone(),
459 module.version.clone(),
460 module.metadata.clone(),
461 module.examples.clone(),
462 move |inputs, ctx| exec_handler(inputs, ctx),
463 );
464 let descriptor = Self::make_descriptor(module, &annotations);
465 (Box::new(fm), descriptor)
466 }
467
468 fn passthrough_handler() -> HandlerFn {
469 Arc::new(|inputs, _ctx| Box::pin(async move { Ok(inputs) }))
470 }
471
472 fn make_descriptor(
473 module: &ScannedModule,
474 annotations: &apcore::module::ModuleAnnotations,
475 ) -> apcore::registry::registry::ModuleDescriptor {
476 apcore::registry::registry::ModuleDescriptor {
477 module_id: module.module_id.clone(),
478 name: Some(module.module_id.clone()),
479 description: module.description.clone(),
480 documentation: module.documentation.clone(),
481 input_schema: module.input_schema.clone(),
482 output_schema: module.output_schema.clone(),
483 version: module.version.clone(),
484 tags: module.tags.clone(),
485 annotations: Some(annotations.clone()),
486 examples: module.examples.clone(),
487 metadata: module.metadata.clone(),
488 display: module.display.clone(),
489 sunset_date: None,
490 dependencies: vec![],
491 enabled: true,
492 }
493 }
494}
495
496fn verify_registry(result: &WriteResult, module_id: &str, registry: &Registry) -> WriteResult {
498 let verifier = RegistryVerifier::new(registry);
499 let vr = verifier.verify("", module_id);
500 if vr.ok {
501 result.clone()
502 } else {
503 WriteResult::failed(module_id.into(), None, vr.error.unwrap_or_default())
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use serde_json::json;
511
512 fn sample_module() -> ScannedModule {
513 ScannedModule::new(
514 "users.get".into(),
515 "Get user".into(),
516 json!({"type": "object"}),
517 json!({"type": "object"}),
518 vec!["users".into()],
519 "app:get_user".into(),
520 )
521 }
522
523 #[test]
524 fn test_write_dry_run() {
525 let writer = RegistryWriter::new();
526 let mut registry = Registry::new();
527 let modules = vec![sample_module()];
528 let results = writer.write(&modules, &mut registry, true, false, None);
529 assert_eq!(results.len(), 1);
530 assert_eq!(results[0].module_id, "users.get");
531 assert!(!registry.has("users.get"));
532 }
533
534 #[test]
535 fn test_write_registers_module() {
536 let writer = RegistryWriter::new();
537 let mut registry = Registry::new();
538 let modules = vec![sample_module()];
539 let results = writer.write(&modules, &mut registry, false, false, None);
540 assert_eq!(results.len(), 1);
541 assert!(registry.has("users.get"));
542 }
543
544 #[test]
545 fn test_write_with_verify() {
546 let writer = RegistryWriter::new();
547 let mut registry = Registry::new();
548 let modules = vec![sample_module()];
549 let results = writer.write(&modules, &mut registry, false, true, None);
550 assert_eq!(results.len(), 1);
551 assert!(results[0].verified);
552 }
553
554 #[test]
555 fn test_write_empty_list() {
556 let writer = RegistryWriter::new();
557 let mut registry = Registry::new();
558 let results = writer.write(&[], &mut registry, false, false, None);
559 assert!(results.is_empty());
560 }
561
562 #[test]
563 fn test_custom_verifier_runs_even_when_verify_false() {
564 use crate::output::types::{Verifier, VerifyResult};
568
569 struct AlwaysFail;
570 impl Verifier for AlwaysFail {
571 fn verify(&self, _path: &str, _module_id: &str) -> VerifyResult {
572 VerifyResult::fail("custom verifier failed".into())
573 }
574 }
575
576 let writer = RegistryWriter::new();
577 let mut registry = Registry::new();
578 let modules = vec![sample_module()];
579 let failing_verifier = AlwaysFail;
580 let verifiers: &[&dyn Verifier] = &[&failing_verifier];
581 let results = writer.write(&modules, &mut registry, false, false, Some(verifiers));
583 assert_eq!(results.len(), 1);
584 assert!(registry.has("users.get"));
586 assert!(
588 !results[0].verified,
589 "custom verifier must run even when verify=false; result: {:?}",
590 results[0]
591 );
592 assert!(
593 results[0]
594 .verification_error
595 .as_deref()
596 .unwrap_or("")
597 .contains("custom verifier failed"),
598 "verification_error should contain the custom verifier message"
599 );
600 }
601
602 #[test]
603 fn test_write_multiple_modules() {
604 let writer = RegistryWriter::new();
605 let mut registry = Registry::new();
606 let modules = vec![
607 ScannedModule::new(
608 "mod.a".into(),
609 "A".into(),
610 json!({"type": "object"}),
611 json!({"type": "object"}),
612 vec![],
613 "app:a".into(),
614 ),
615 ScannedModule::new(
616 "mod.b".into(),
617 "B".into(),
618 json!({"type": "object"}),
619 json!({"type": "object"}),
620 vec![],
621 "app:b".into(),
622 ),
623 ];
624 let results = writer.write(&modules, &mut registry, false, false, None);
625 assert_eq!(results.len(), 2);
626 assert!(registry.has("mod.a"));
627 assert!(registry.has("mod.b"));
628 assert!(results[0].verified);
629 assert!(results[1].verified);
630 }
631
632 #[test]
636 fn test_allowed_prefixes_rejects_non_matching_target() {
637 let writer =
641 RegistryWriter::new().with_allowed_prefixes(vec!["app".into(), "myapp".into()]);
642 let mut registry = Registry::new();
643 let allowed = sample_module(); let denied = ScannedModule::new(
645 "evil.module".into(),
646 "Forged target".into(),
647 json!({"type": "object"}),
648 json!({"type": "object"}),
649 vec![],
650 "evil:run_attacker_code".into(),
651 );
652 let results = writer.write(&[allowed, denied], &mut registry, false, false, None);
653 assert_eq!(results.len(), 2);
654 assert!(registry.has("users.get"));
656 assert!(results[0].verified);
657 assert!(!registry.has("evil.module"));
659 assert!(!results[1].verified);
660 let err = results[1].verification_error.as_deref().unwrap_or("");
661 assert!(
662 err.contains("allowed_prefixes"),
663 "rejection message should mention allowed_prefixes: got {err:?}"
664 );
665 }
666
667 #[test]
672 fn test_target_allowed_boundary_aware() {
673 let writer = RegistryWriter::new().with_allowed_prefixes(vec!["myapp".into()]);
674 assert!(writer.target_allowed("myapp:fn"));
676 assert!(writer.target_allowed("myapp.foo:fn"));
678 assert!(writer.target_allowed("myapp.foo.bar:fn"));
679 assert!(!writer.target_allowed("myappx.evil:fn"));
681 assert!(!writer.target_allowed("myappx:fn"));
682 assert!(!writer.target_allowed("other:fn"));
684
685 let writer2 = RegistryWriter::new().with_allowed_prefixes(vec!["myapp.foo".into()]);
687 assert!(writer2.target_allowed("myapp.foo:fn"));
688 assert!(writer2.target_allowed("myapp.foo.bar:fn"));
689 assert!(!writer2.target_allowed("myapp.foobar:fn"));
690 assert!(!writer2.target_allowed("myapp:fn"));
691
692 let writer3 = RegistryWriter::new().with_allowed_prefixes(vec!["myapp.".into()]);
694 assert!(writer3.target_allowed("myapp:fn"));
695 let writer4 = RegistryWriter::new().with_allowed_prefixes(vec!["".into()]);
696 assert!(!writer4.target_allowed("anything:fn"));
697 }
698
699 #[test]
700 fn test_allowed_prefixes_default_none_admits_everything() {
701 let writer = RegistryWriter::new();
705 let mut registry = Registry::new();
706 let module = ScannedModule::new(
707 "any.module".into(),
708 "Any target".into(),
709 json!({"type": "object"}),
710 json!({"type": "object"}),
711 vec![],
712 "anything-goes:func".into(),
713 );
714 let results = writer.write(&[module], &mut registry, false, false, None);
715 assert_eq!(results.len(), 1);
716 assert!(registry.has("any.module"));
717 }
718
719 fn make_streaming_module() -> ScannedModule {
722 use apcore::module::ModuleAnnotations;
723 let mut m = ScannedModule::new(
724 "stream.test".into(),
725 "streaming module".into(),
726 json!({"type": "object"}),
727 json!({"type": "object"}),
728 vec![],
729 "app:stream_handler".into(),
730 );
731 m.annotations = Some(ModuleAnnotations {
732 streaming: true,
733 ..Default::default()
734 });
735 m
736 }
737
738 #[test]
739 fn test_streaming_factory_registers_streaming_module() {
740 use futures::stream;
743 let stream_factory: StreamingHandlerFactory = Arc::new(|_target: &str| {
744 Some(Arc::new(|_inputs: serde_json::Value, _ctx: &_| {
745 let s = stream::iter(vec![Ok(json!({"chunk": 1}))]);
746 Box::pin(s) as ChunkStream
747 }))
748 });
749
750 let writer = RegistryWriter::new().with_streaming_handler_factory(stream_factory);
751 let mut registry = Registry::new();
752 let module = make_streaming_module();
753 let results = writer.write(&[module], &mut registry, false, false, None);
754
755 assert_eq!(results.len(), 1);
756 assert!(
757 results[0].verified,
758 "streaming module should register successfully"
759 );
760 assert!(registry.has("stream.test"));
761 }
762
763 #[test]
764 fn test_streaming_annotation_no_factory_clears_streaming_and_warns() {
765 let writer = RegistryWriter::new(); let mut registry = Registry::new();
770 let module = make_streaming_module();
771 let results = writer.write(&[module], &mut registry, false, false, None);
772
773 assert_eq!(results.len(), 1);
774 assert!(
775 results[0].verified,
776 "module should register even without streaming factory"
777 );
778 assert!(registry.has("stream.test"));
779 }
780
781 #[test]
782 fn test_non_streaming_module_unaffected_by_streaming_factory() {
783 use futures::stream;
786 let stream_factory: StreamingHandlerFactory = Arc::new(|_: &str| {
787 Some(Arc::new(|inputs: serde_json::Value, _ctx: &_| {
788 let s = stream::iter(vec![Ok(inputs)]);
789 Box::pin(s) as ChunkStream
790 }))
791 });
792
793 let writer = RegistryWriter::new().with_streaming_handler_factory(stream_factory);
794 let mut registry = Registry::new();
795 let module = sample_module(); let results = writer.write(&[module], &mut registry, false, false, None);
797
798 assert_eq!(results.len(), 1);
799 assert!(results[0].verified);
800 assert!(registry.has("users.get"));
801 }
802
803 #[test]
804 fn test_custom_verifier_runs_unconditionally_and_merge_semantics() {
805 use crate::output::types::{Verifier, VerifyResult};
810
811 struct AlwaysFail;
812 impl Verifier for AlwaysFail {
813 fn verify(&self, _path: &str, _module_id: &str) -> VerifyResult {
814 VerifyResult::fail("custom fail and-merge".into())
815 }
816 }
817
818 let writer = RegistryWriter::new();
819 let mut registry = Registry::new();
820 let modules = vec![sample_module()];
821 let failing_verifier = AlwaysFail;
822 let verifiers: &[&dyn Verifier] = &[&failing_verifier];
823 let results = writer.write(&modules, &mut registry, false, true, Some(verifiers));
826 assert_eq!(results.len(), 1);
827 assert!(registry.has("users.get"), "module must be registered");
828 assert!(
829 !results[0].verified,
830 "AND-merge: builtin=true AND custom=false must yield verified=false; got: {:?}",
831 results[0]
832 );
833 assert!(
834 results[0]
835 .verification_error
836 .as_deref()
837 .unwrap_or("")
838 .contains("custom fail and-merge"),
839 "verification_error must contain the custom verifier message"
840 );
841 }
842}