1use super::errors::PackageError;
2use super::*;
3
4pub(crate) fn manifest_capabilities(
5 manifest: &Manifest,
6) -> Option<&harn_vm::llm::capabilities::CapabilitiesFile> {
7 manifest.capabilities.as_ref()
8}
9
10pub(crate) fn is_empty_capabilities(file: &harn_vm::llm::capabilities::CapabilitiesFile) -> bool {
11 file.provider.is_empty() && file.provider_family.is_empty()
12}
13
14pub fn validate_runtime_manifest_extensions(anchor: &Path) -> Result<(), PackageError> {
15 let Some((manifest, _manifest_dir)) = find_nearest_manifest(anchor) else {
16 return Ok(());
17 };
18 validate_handoff_routes(&manifest.handoff_routes, &manifest)
19}
20
21pub fn try_load_runtime_extensions(anchor: &Path) -> Result<RuntimeExtensions, PackageError> {
24 ensure_dependencies_materialized(anchor)?;
25 let Some((root_manifest, manifest_dir)) = find_nearest_manifest(anchor) else {
26 return Ok(RuntimeExtensions::default());
27 };
28
29 let mut llm = harn_vm::llm_config::ProvidersConfig::default();
30 let mut capabilities = harn_vm::llm::capabilities::CapabilitiesFile::default();
31 let mut hooks = Vec::new();
32 let mut triggers = Vec::new();
33
34 llm.merge_from(&root_manifest.llm);
35 if let Some(file) = manifest_capabilities(&root_manifest) {
36 merge_capability_overrides(&mut capabilities, file);
37 }
38 hooks.extend(resolved_hooks_from_manifest(&root_manifest, &manifest_dir));
39 triggers.extend(resolved_triggers_from_manifest(
40 &root_manifest,
41 &manifest_dir,
42 ));
43 let handoff_routes = root_manifest.handoff_routes.clone();
44 validate_handoff_routes(&handoff_routes, &root_manifest)?;
45 let provider_connectors =
46 resolved_provider_connectors_from_manifest(&root_manifest, &manifest_dir);
47
48 Ok(RuntimeExtensions {
49 root_manifest_path: Some(manifest_dir.join(MANIFEST)),
50 root_manifest_dir: Some(manifest_dir),
51 root_manifest: Some(root_manifest),
52 llm: (!llm.is_empty()).then_some(llm),
53 capabilities: (!is_empty_capabilities(&capabilities)).then_some(capabilities),
54 hooks,
55 triggers,
56 handoff_routes,
57 provider_connectors,
58 })
59}
60
61pub fn load_runtime_extensions(anchor: &Path) -> RuntimeExtensions {
62 match try_load_runtime_extensions(anchor) {
63 Ok(extensions) => extensions,
64 Err(error) => {
65 eprintln!("error: {error}");
66 process::exit(1);
67 }
68 }
69}
70
71pub fn install_runtime_extensions(extensions: &RuntimeExtensions) {
73 harn_vm::llm_config::set_user_overrides(extensions.llm.clone());
74 harn_vm::llm::capabilities::set_user_overrides(extensions.capabilities.clone());
75 install_manifest_handoff_routes(extensions);
76 install_orchestrator_budget(extensions);
77}
78
79pub fn install_manifest_handoff_routes(extensions: &RuntimeExtensions) {
80 harn_vm::install_handoff_routes(extensions.handoff_routes.clone());
81}
82
83pub fn install_orchestrator_budget(extensions: &RuntimeExtensions) {
84 let budget = extensions
85 .root_manifest
86 .as_ref()
87 .map(|manifest| harn_vm::OrchestratorBudgetConfig {
88 daily_cost_usd: manifest.orchestrator.budget.daily_cost_usd,
89 hourly_cost_usd: manifest.orchestrator.budget.hourly_cost_usd,
90 })
91 .unwrap_or_default();
92 harn_vm::install_orchestrator_budget(budget);
93}
94
95pub async fn install_manifest_hooks(
96 vm: &mut harn_vm::Vm,
97 extensions: &RuntimeExtensions,
98) -> Result<(), PackageError> {
99 harn_vm::orchestration::clear_runtime_hooks();
100 let mut loaded_exports: HashMap<ManifestModuleCacheKey, ManifestModuleExports> = HashMap::new();
101 for hook in &extensions.hooks {
102 let Some((module_name, function_name)) = hook.handler.rsplit_once("::") else {
103 return Err(format!(
104 "invalid hook handler '{}': expected <module>::<function>",
105 hook.handler
106 )
107 .into());
108 };
109 let cache_key = (
110 hook.manifest_dir.clone(),
111 hook.package_name.clone(),
112 Some(module_name.to_string()),
113 );
114 if !loaded_exports.contains_key(&cache_key) {
115 let exports = resolve_manifest_exports(
116 vm,
117 &hook.manifest_dir,
118 hook.package_name.as_deref(),
119 &hook.exports,
120 Some(module_name),
121 )
122 .await?;
123 loaded_exports.insert(cache_key.clone(), exports);
124 }
125 let exports = loaded_exports
126 .get(&cache_key)
127 .expect("manifest hook exports cached");
128 let Some(closure) = exports.get(function_name) else {
129 return Err(format!(
130 "hook handler '{function_name}' is not exported by module '{module_name}'"
131 )
132 .into());
133 };
134 harn_vm::orchestration::register_vm_hook(
135 hook.event,
136 hook.pattern.clone(),
137 hook.handler.clone(),
138 closure.clone(),
139 );
140 }
141 Ok(())
142}
143
144pub async fn collect_manifest_triggers(
145 vm: &mut harn_vm::Vm,
146 extensions: &RuntimeExtensions,
147) -> Result<Vec<CollectedManifestTrigger>, PackageError> {
148 let _provider_schema_guard = lock_manifest_provider_schemas().await;
149 let provider_catalog = build_manifest_provider_catalog(extensions).await?;
150 validate_orchestrator_budget(extensions.root_manifest.as_ref())?;
151 validate_static_trigger_configs(&extensions.triggers, &provider_catalog)?;
152 let mut loaded_exports: HashMap<ManifestModuleCacheKey, ManifestModuleExports> = HashMap::new();
153 let mut module_signatures: HashMap<PathBuf, BTreeMap<String, TriggerFunctionSignature>> =
154 HashMap::new();
155 let mut collected = Vec::new();
156
157 for trigger in &extensions.triggers {
158 let handler = parse_trigger_handler_uri(trigger)?;
159 let collected_handler = match handler {
160 TriggerHandlerUri::Local(reference) => {
161 let cache_key = (
162 trigger.manifest_dir.clone(),
163 trigger.package_name.clone(),
164 reference.module_name.clone(),
165 );
166 if !loaded_exports.contains_key(&cache_key) {
167 let exports = resolve_manifest_exports(
168 vm,
169 &trigger.manifest_dir,
170 trigger.package_name.as_deref(),
171 &trigger.exports,
172 reference.module_name.as_deref(),
173 )
174 .await
175 .map_err(|error| trigger_error(trigger, error))?;
176 loaded_exports.insert(cache_key.clone(), exports);
177 }
178 let exports = loaded_exports
179 .get(&cache_key)
180 .expect("manifest trigger exports cached");
181 let Some(closure) = exports.get(&reference.function_name) else {
182 return Err(trigger_error(
183 trigger,
184 format!(
185 "handler '{}' is not exported by the resolved module",
186 reference.raw
187 ),
188 ));
189 };
190 CollectedTriggerHandler::Local {
191 reference,
192 closure: closure.clone(),
193 }
194 }
195 TriggerHandlerUri::A2a {
196 target,
197 allow_cleartext,
198 } => CollectedTriggerHandler::A2a {
199 target,
200 allow_cleartext,
201 },
202 TriggerHandlerUri::Worker { queue } => CollectedTriggerHandler::Worker { queue },
203 TriggerHandlerUri::Persona { name } => {
204 let binding = persona_runtime_binding_for_handler(extensions, trigger, &name)?;
205 CollectedTriggerHandler::Persona { binding }
206 }
207 };
208
209 let collected_when = if let Some(when_raw) = &trigger.when {
210 let reference = parse_local_trigger_ref(when_raw, "when", trigger)?;
211 let cache_key = (
212 trigger.manifest_dir.clone(),
213 trigger.package_name.clone(),
214 reference.module_name.clone(),
215 );
216 if !loaded_exports.contains_key(&cache_key) {
217 let exports = resolve_manifest_exports(
218 vm,
219 &trigger.manifest_dir,
220 trigger.package_name.as_deref(),
221 &trigger.exports,
222 reference.module_name.as_deref(),
223 )
224 .await
225 .map_err(|error| trigger_error(trigger, error))?;
226 loaded_exports.insert(cache_key.clone(), exports);
227 }
228 let exports = loaded_exports
229 .get(&cache_key)
230 .expect("manifest trigger predicate exports cached");
231 let Some(closure) = exports.get(&reference.function_name) else {
232 return Err(trigger_error(
233 trigger,
234 format!(
235 "when predicate '{}' is not exported by the resolved module",
236 reference.raw
237 ),
238 ));
239 };
240
241 let source_path = manifest_module_source_path(
242 &trigger.manifest_dir,
243 trigger.package_name.as_deref(),
244 &trigger.exports,
245 reference.module_name.as_deref(),
246 )
247 .map_err(|error| trigger_error(trigger, error))?;
248 if !module_signatures.contains_key(&source_path) {
249 let signatures = load_trigger_function_signatures(&source_path)
250 .map_err(|error| trigger_error(trigger, error))?;
251 module_signatures.insert(source_path.clone(), signatures);
252 }
253 let signatures = module_signatures
254 .get(&source_path)
255 .expect("module signatures cached");
256 let Some(signature) = signatures.get(&reference.function_name) else {
257 return Err(trigger_error(
258 trigger,
259 format!(
260 "when predicate '{}' must resolve to a function declaration",
261 reference.raw
262 ),
263 ));
264 };
265 if signature.params.len() != 1
266 || signature.params[0]
267 .as_ref()
268 .is_none_or(|param| !is_trigger_event_type(param))
269 {
270 return Err(trigger_error(
271 trigger,
272 format!(
273 "when predicate '{}' must have signature fn(TriggerEvent) -> bool",
274 reference.raw
275 ),
276 ));
277 }
278 if signature
279 .return_type
280 .as_ref()
281 .is_none_or(|return_type| !is_predicate_return_type(return_type))
282 {
283 return Err(trigger_error(
284 trigger,
285 format!(
286 "when predicate '{}' must have signature fn(TriggerEvent) -> bool or Result<bool, _>",
287 reference.raw
288 ),
289 ));
290 }
291
292 Some(CollectedTriggerPredicate {
293 reference,
294 closure: closure.clone(),
295 })
296 } else {
297 None
298 };
299
300 let flow_control = collect_trigger_flow_control(vm, trigger).await?;
301
302 collected.push(CollectedManifestTrigger {
303 config: trigger.clone(),
304 handler: collected_handler,
305 when: collected_when,
306 flow_control,
307 });
308 }
309
310 harn_vm::install_provider_catalog(provider_catalog);
311 Ok(collected)
312}
313
314pub(crate) async fn collect_trigger_flow_control(
315 vm: &mut harn_vm::Vm,
316 trigger: &ResolvedTriggerConfig,
317) -> Result<harn_vm::TriggerFlowControlConfig, PackageError> {
318 let mut flow = harn_vm::TriggerFlowControlConfig::default();
319
320 let concurrency = if let Some(spec) = &trigger.concurrency {
321 Some(spec.clone())
322 } else if let Some(max) = trigger.budget.max_concurrent {
323 eprintln!(
324 "warning: {} uses deprecated budget.max_concurrent; prefer concurrency = {{ max = {} }}",
325 manifest_trigger_location(trigger),
326 max
327 );
328 Some(TriggerConcurrencyManifestSpec { key: None, max })
329 } else {
330 None
331 };
332 if let Some(spec) = concurrency {
333 flow.concurrency = Some(harn_vm::TriggerConcurrencyConfig {
334 key: compile_optional_trigger_expression(
335 vm,
336 trigger,
337 "concurrency.key",
338 spec.key.as_deref(),
339 )
340 .await?,
341 max: spec.max,
342 });
343 }
344
345 if let Some(spec) = &trigger.throttle {
346 flow.throttle = Some(harn_vm::TriggerThrottleConfig {
347 key: compile_optional_trigger_expression(
348 vm,
349 trigger,
350 "throttle.key",
351 spec.key.as_deref(),
352 )
353 .await?,
354 period: harn_vm::parse_flow_control_duration(&spec.period)
355 .map_err(|error| trigger_error(trigger, format!("throttle.period {error}")))?,
356 max: spec.max,
357 });
358 }
359
360 if let Some(spec) = &trigger.rate_limit {
361 flow.rate_limit = Some(harn_vm::TriggerRateLimitConfig {
362 key: compile_optional_trigger_expression(
363 vm,
364 trigger,
365 "rate_limit.key",
366 spec.key.as_deref(),
367 )
368 .await?,
369 period: harn_vm::parse_flow_control_duration(&spec.period)
370 .map_err(|error| trigger_error(trigger, format!("rate_limit.period {error}")))?,
371 max: spec.max,
372 });
373 }
374
375 if let Some(spec) = &trigger.debounce {
376 flow.debounce = Some(harn_vm::TriggerDebounceConfig {
377 key: compile_trigger_expression(vm, trigger, "debounce.key", &spec.key).await?,
378 period: harn_vm::parse_flow_control_duration(&spec.period)
379 .map_err(|error| trigger_error(trigger, format!("debounce.period {error}")))?,
380 });
381 }
382
383 if let Some(spec) = &trigger.singleton {
384 flow.singleton = Some(harn_vm::TriggerSingletonConfig {
385 key: compile_optional_trigger_expression(
386 vm,
387 trigger,
388 "singleton.key",
389 spec.key.as_deref(),
390 )
391 .await?,
392 });
393 }
394
395 if let Some(spec) = &trigger.batch {
396 flow.batch = Some(harn_vm::TriggerBatchConfig {
397 key: compile_optional_trigger_expression(vm, trigger, "batch.key", spec.key.as_deref())
398 .await?,
399 size: spec.size,
400 timeout: harn_vm::parse_flow_control_duration(&spec.timeout)
401 .map_err(|error| trigger_error(trigger, format!("batch.timeout {error}")))?,
402 });
403 }
404
405 if let Some(spec) = &trigger.priority_flow {
406 flow.priority = Some(harn_vm::TriggerPriorityOrderConfig {
407 key: compile_trigger_expression(vm, trigger, "priority.key", &spec.key).await?,
408 order: spec.order.clone(),
409 });
410 }
411
412 Ok(flow)
413}
414
415fn persona_runtime_binding_for_handler(
416 extensions: &RuntimeExtensions,
417 trigger: &ResolvedTriggerConfig,
418 name: &str,
419) -> Result<harn_vm::PersonaRuntimeBinding, PackageError> {
420 let Some(manifest) = extensions.root_manifest.as_ref() else {
421 return Err(trigger_error(
422 trigger,
423 format!("handler persona://{name} requires a root manifest"),
424 ));
425 };
426 let Some(persona) = manifest
427 .personas
428 .iter()
429 .find(|persona| persona.name.as_deref() == Some(name))
430 else {
431 return Err(trigger_error(
432 trigger,
433 format!("handler persona://{name} does not match a declared persona"),
434 ));
435 };
436 Ok(persona_runtime_binding(name, persona))
437}
438
439pub(crate) fn persona_runtime_binding(
443 name: &str,
444 persona: &PersonaManifestEntry,
445) -> harn_vm::PersonaRuntimeBinding {
446 harn_vm::PersonaRuntimeBinding {
447 name: name.to_string(),
448 template_ref: persona_template_ref(persona),
449 entry_workflow: persona.entry_workflow.clone().unwrap_or_default(),
450 schedules: persona.schedules.clone(),
451 triggers: persona.triggers.clone(),
452 budget: harn_vm::PersonaBudgetPolicy {
453 daily_usd: persona.budget.daily_usd,
454 hourly_usd: persona.budget.hourly_usd,
455 run_usd: persona.budget.run_usd,
456 max_tokens: persona.budget.max_tokens,
457 },
458 stages: persona
459 .stages
460 .iter()
461 .map(persona_stage_decl_to_runtime)
462 .collect(),
463 }
464}
465
466fn persona_stage_decl_to_runtime(stage: &PersonaStageDecl) -> harn_vm::StageDecl {
467 harn_vm::StageDecl {
468 name: stage.name.clone(),
469 allowed_tools: stage.allowed_tools.clone(),
470 side_effect_level: stage.side_effect_level.clone(),
471 max_iterations: stage.max_iterations,
472 on_exit: stage.on_exit.as_ref().map(|exit| harn_vm::StageExit {
473 on_complete: exit.on_complete.clone(),
474 on_failure: exit.on_failure.clone(),
475 policy_override: None,
476 }),
477 }
478}
479
480pub(crate) async fn compile_optional_trigger_expression(
481 vm: &mut harn_vm::Vm,
482 trigger: &ResolvedTriggerConfig,
483 field_name: &str,
484 expr: Option<&str>,
485) -> Result<Option<harn_vm::TriggerExpressionSpec>, PackageError> {
486 match expr {
487 Some(expr) => compile_trigger_expression(vm, trigger, field_name, expr)
488 .await
489 .map(Some),
490 None => Ok(None),
491 }
492}
493
494pub(crate) async fn compile_trigger_expression(
495 vm: &mut harn_vm::Vm,
496 trigger: &ResolvedTriggerConfig,
497 field_name: &str,
498 expr: &str,
499) -> Result<harn_vm::TriggerExpressionSpec, PackageError> {
500 let synthetic = PathBuf::from(format!(
501 "<trigger-expr>/{}/{:04}-{}.harn",
502 harn_vm::event_log::sanitize_topic_component(&trigger.id),
503 trigger.table_index,
504 harn_vm::event_log::sanitize_topic_component(field_name),
505 ));
506 let source = format!(
507 "import \"std/triggers\"\n\npub fn __trigger_expr(event: TriggerEvent) -> any {{\n return {expr}\n}}\n"
508 );
509 let exports = vm
510 .load_module_exports_from_source(synthetic, &source)
511 .await
512 .map_err(|error| {
513 trigger_error(
514 trigger,
515 format!("{field_name} '{expr}' is invalid Harn expression: {error}"),
516 )
517 })?;
518 let closure = exports.get("__trigger_expr").ok_or_else(|| {
519 trigger_error(
520 trigger,
521 format!("{field_name} '{expr}' did not compile into an exported closure"),
522 )
523 })?;
524 Ok(harn_vm::TriggerExpressionSpec {
525 raw: expr.to_string(),
526 closure: closure.clone(),
527 })
528}
529
530pub(crate) fn trigger_kind_label(kind: TriggerKind) -> &'static str {
531 match kind {
532 TriggerKind::Webhook => "webhook",
533 TriggerKind::Cron => "cron",
534 TriggerKind::Poll => "poll",
535 TriggerKind::Stream => "stream",
536 TriggerKind::Predicate => "predicate",
537 TriggerKind::A2aPush => "a2a-push",
538 }
539}
540
541pub(crate) fn worker_queue_priority(
542 priority: TriggerDispatchPriority,
543) -> harn_vm::WorkerQueuePriority {
544 match priority {
545 TriggerDispatchPriority::High => harn_vm::WorkerQueuePriority::High,
546 TriggerDispatchPriority::Normal => harn_vm::WorkerQueuePriority::Normal,
547 TriggerDispatchPriority::Low => harn_vm::WorkerQueuePriority::Low,
548 }
549}
550
551pub fn manifest_trigger_binding_spec(
552 trigger: CollectedManifestTrigger,
553) -> harn_vm::TriggerBindingSpec {
554 let flow_control = trigger.flow_control.clone();
555 let config = trigger.config;
556 let (handler, handler_descriptor) = match trigger.handler {
557 CollectedTriggerHandler::Local { reference, closure } => (
558 harn_vm::TriggerHandlerSpec::Local {
559 raw: reference.raw.clone(),
560 closure,
561 },
562 serde_json::json!({
563 "kind": "local",
564 "raw": reference.raw,
565 }),
566 ),
567 CollectedTriggerHandler::A2a {
568 target,
569 allow_cleartext,
570 } => (
571 harn_vm::TriggerHandlerSpec::A2a {
572 target: target.clone(),
573 allow_cleartext,
574 },
575 serde_json::json!({
576 "kind": "a2a",
577 "target": target,
578 "allow_cleartext": allow_cleartext,
579 }),
580 ),
581 CollectedTriggerHandler::Worker { queue } => (
582 harn_vm::TriggerHandlerSpec::Worker {
583 queue: queue.clone(),
584 },
585 serde_json::json!({
586 "kind": "worker",
587 "queue": queue,
588 }),
589 ),
590 CollectedTriggerHandler::Persona { binding } => (
591 harn_vm::TriggerHandlerSpec::Persona {
592 binding: binding.clone(),
593 },
594 serde_json::json!({
595 "kind": "persona",
596 "name": binding.name,
597 "entry_workflow": binding.entry_workflow,
598 }),
599 ),
600 };
601
602 let when_raw = trigger
603 .when
604 .as_ref()
605 .map(|predicate| predicate.reference.raw.clone());
606 let when = trigger.when.map(|predicate| harn_vm::TriggerPredicateSpec {
607 raw: predicate.reference.raw,
608 closure: predicate.closure,
609 });
610 let mut when_budget = config
611 .when_budget
612 .as_ref()
613 .map(|budget| {
614 Ok::<harn_vm::TriggerPredicateBudget, String>(harn_vm::TriggerPredicateBudget {
615 max_cost_usd: budget.max_cost_usd,
616 tokens_max: budget.tokens_max,
617 timeout_ms: budget
618 .timeout
619 .as_deref()
620 .map(parse_duration_millis)
621 .transpose()?,
622 })
623 })
624 .transpose()
625 .unwrap_or_default();
626 if config.budget.max_cost_usd.is_some() || config.budget.max_tokens.is_some() {
627 let budget = when_budget.get_or_insert_with(harn_vm::TriggerPredicateBudget::default);
628 if budget.max_cost_usd.is_none() {
629 budget.max_cost_usd = config.budget.max_cost_usd;
630 }
631 if budget.tokens_max.is_none() {
632 budget.tokens_max = config.budget.max_tokens;
633 }
634 }
635 let id = config.id.clone();
636 let kind = trigger_kind_label(config.kind).to_string();
637 let provider = config.provider.clone();
638 let autonomy_tier = config.autonomy_tier;
639 let match_events = config.match_.events.clone();
640 let dedupe_key = config.dedupe_key.clone();
641 let retry = harn_vm::TriggerRetryConfig::new(
642 config.retry.max,
643 match config.retry.backoff {
644 TriggerRetryBackoff::Immediate => harn_vm::RetryPolicy::Linear { delay_ms: 0 },
645 TriggerRetryBackoff::Svix => harn_vm::RetryPolicy::Svix,
646 },
647 );
648 let filter = config.filter.clone();
649 let dedupe_retention_days = config.retry.retention_days;
650 let daily_cost_usd = config.budget.daily_cost_usd;
651 let hourly_cost_usd = config.budget.hourly_cost_usd;
652 let max_autonomous_decisions_per_hour = config.budget.max_autonomous_decisions_per_hour;
653 let max_autonomous_decisions_per_day = config.budget.max_autonomous_decisions_per_day;
654 let on_budget_exhausted = config.budget.on_budget_exhausted;
655 let max_concurrent = flow_control.concurrency.as_ref().map(|config| config.max);
656 let manifest_path = Some(config.manifest_path.clone());
657 let package_name = config.package_name.clone();
658
659 let fingerprint = serde_json::to_string(&serde_json::json!({
660 "id": &id,
661 "kind": &kind,
662 "provider": provider.as_str(),
663 "autonomy_tier": autonomy_tier,
664 "match": config.match_,
665 "when": when_raw,
666 "when_budget": config.when_budget,
667 "handler": handler_descriptor,
668 "dedupe_key": &dedupe_key,
669 "retry": config.retry,
670 "dispatch_priority": config.dispatch_priority,
671 "budget": config.budget,
672 "flow_control": {
673 "concurrency": config.concurrency,
674 "throttle": config.throttle,
675 "rate_limit": config.rate_limit,
676 "debounce": config.debounce,
677 "singleton": config.singleton,
678 "batch": config.batch,
679 "priority": config.priority_flow,
680 },
681 "window": config.window,
682 "secrets": config.secrets,
683 "filter": &filter,
684 "kind_specific": config.kind_specific,
685 "manifest_path": &manifest_path,
686 "package_name": &package_name,
687 }))
688 .unwrap_or_else(|_| format!("{}:{}:{}", id, kind, provider.as_str()));
689
690 harn_vm::TriggerBindingSpec {
691 id,
692 source: harn_vm::TriggerBindingSource::Manifest,
693 kind,
694 provider,
695 autonomy_tier,
696 handler,
697 dispatch_priority: worker_queue_priority(config.dispatch_priority),
698 when,
699 when_budget,
700 retry,
701 match_events,
702 dedupe_key,
703 filter,
704 dedupe_retention_days,
705 daily_cost_usd,
706 hourly_cost_usd,
707 max_autonomous_decisions_per_hour,
708 max_autonomous_decisions_per_day,
709 on_budget_exhausted,
710 max_concurrent,
711 flow_control,
712 aggregation: None,
713 manifest_path,
714 package_name,
715 definition_fingerprint: fingerprint,
716 }
717}
718
719pub async fn install_manifest_triggers(
720 vm: &mut harn_vm::Vm,
721 extensions: &RuntimeExtensions,
722) -> Result<(), PackageError> {
723 install_orchestrator_budget(extensions);
724 let collected = collect_manifest_triggers(vm, extensions).await?;
725 let mut bindings: Vec<_> = collected
726 .iter()
727 .cloned()
728 .map(manifest_trigger_binding_spec)
729 .collect();
730 bindings.extend(collect_persona_trigger_binding_specs(extensions)?);
731 harn_vm::install_manifest_triggers(bindings)
732 .await
733 .map_err(|error| PackageError::Extensions(error.to_string()))
734}
735
736pub async fn install_collected_manifest_triggers(
737 collected: &[CollectedManifestTrigger],
738) -> Result<(), PackageError> {
739 let bindings = collected
740 .iter()
741 .cloned()
742 .map(manifest_trigger_binding_spec)
743 .collect();
744 harn_vm::install_manifest_triggers(bindings)
745 .await
746 .map_err(|error| PackageError::Extensions(error.to_string()))
747}
748
749pub fn collect_persona_trigger_binding_specs(
750 extensions: &RuntimeExtensions,
751) -> Result<Vec<harn_vm::TriggerBindingSpec>, PackageError> {
752 let Some(manifest) = extensions.root_manifest.clone() else {
753 return Ok(Vec::new());
754 };
755 let manifest_path = extensions
756 .root_manifest_path
757 .clone()
758 .unwrap_or_else(|| PathBuf::from(MANIFEST));
759 let manifest_dir = extensions
760 .root_manifest_dir
761 .clone()
762 .or_else(|| manifest_path.parent().map(Path::to_path_buf))
763 .unwrap_or_else(|| PathBuf::from("."));
764 let resolved =
765 validate_and_resolve_personas(manifest, manifest_path, manifest_dir).map_err(|errors| {
766 errors
767 .iter()
768 .map(ToString::to_string)
769 .collect::<Vec<_>>()
770 .join("\n")
771 })?;
772 let mut bindings = Vec::new();
773 for persona in resolved.personas {
774 let Some(name) = persona.name.clone() else {
775 continue;
776 };
777 for trigger in &persona.triggers {
778 let Some((provider, kind)) = trigger.split_once('.') else {
779 continue;
780 };
781 let provider = provider.trim();
782 let kind = kind.trim();
783 if provider.is_empty() || kind.is_empty() {
784 continue;
785 }
786 bindings.push(persona_trigger_binding_spec(
787 &resolved.manifest_path,
788 &name,
789 provider,
790 kind,
791 &persona,
792 ));
793 }
794 }
795 Ok(bindings)
796}
797
798fn persona_trigger_binding_spec(
799 manifest_path: &Path,
800 name: &str,
801 provider: &str,
802 kind: &str,
803 persona: &PersonaManifestEntry,
804) -> harn_vm::TriggerBindingSpec {
805 let runtime_binding = persona_runtime_binding(name, persona);
806 let id = format!("persona.{name}.{provider}.{kind}");
807 let handler = harn_vm::TriggerHandlerSpec::Persona {
808 binding: runtime_binding.clone(),
809 };
810 let fingerprint = serde_json::to_string(&serde_json::json!({
811 "id": &id,
812 "kind": kind,
813 "provider": provider,
814 "handler": {
815 "kind": "persona",
816 "name": name,
817 "entry_workflow": runtime_binding.entry_workflow,
818 },
819 "budget": runtime_binding.budget,
820 "manifest_path": manifest_path,
821 }))
822 .unwrap_or_else(|_| format!("{id}:{provider}:{kind}:{name}"));
823
824 harn_vm::TriggerBindingSpec {
825 id,
826 source: harn_vm::TriggerBindingSource::Manifest,
827 kind: kind.to_string(),
828 provider: harn_vm::ProviderId::from(provider.to_string()),
829 autonomy_tier: persona
830 .autonomy_tier
831 .map(persona_autonomy_to_vm)
832 .unwrap_or(harn_vm::AutonomyTier::Suggest),
833 handler,
834 dispatch_priority: harn_vm::WorkerQueuePriority::Normal,
835 when: None,
836 when_budget: None,
837 retry: harn_vm::TriggerRetryConfig::default(),
838 match_events: vec![kind.to_string()],
839 dedupe_key: None,
840 filter: None,
841 dedupe_retention_days: 7,
842 daily_cost_usd: persona.budget.daily_usd,
843 hourly_cost_usd: persona.budget.hourly_usd,
844 max_autonomous_decisions_per_hour: None,
845 max_autonomous_decisions_per_day: None,
846 on_budget_exhausted: harn_vm::TriggerBudgetExhaustionStrategy::RetryLater,
847 max_concurrent: None,
848 flow_control: harn_vm::TriggerFlowControlConfig::default(),
849 aggregation: None,
850 manifest_path: Some(manifest_path.to_path_buf()),
851 package_name: None,
852 definition_fingerprint: fingerprint,
853 }
854}
855
856fn persona_autonomy_to_vm(value: PersonaAutonomyTier) -> harn_vm::AutonomyTier {
857 match value {
858 PersonaAutonomyTier::Shadow => harn_vm::AutonomyTier::Shadow,
859 PersonaAutonomyTier::Suggest => harn_vm::AutonomyTier::Suggest,
860 PersonaAutonomyTier::ActWithApproval => harn_vm::AutonomyTier::ActWithApproval,
861 PersonaAutonomyTier::ActAuto => harn_vm::AutonomyTier::ActAuto,
862 }
863}
864
865fn persona_template_ref(persona: &PersonaManifestEntry) -> Option<String> {
866 persona
867 .package_source
868 .package
869 .as_ref()
870 .zip(persona.version.as_ref())
871 .map(|(package, version)| format!("{package}@{version}"))
872 .or_else(|| persona.package_source.package.clone())
873 .or_else(|| {
874 persona
875 .name
876 .as_ref()
877 .zip(persona.version.as_ref())
878 .map(|(name, version)| format!("{name}@{version}"))
879 })
880}
881
882pub fn load_personas_from_manifest_path(
883 manifest_path: &Path,
884) -> Result<ResolvedPersonaManifest, Vec<PersonaValidationError>> {
885 let manifest_path = if manifest_path.is_dir() {
886 manifest_path.join(MANIFEST)
887 } else {
888 manifest_path.to_path_buf()
889 };
890 let manifest_dir = manifest_path
891 .parent()
892 .map(Path::to_path_buf)
893 .unwrap_or_else(|| PathBuf::from("."));
894 if manifest_path.extension().and_then(|ext| ext.to_str()) == Some("harn") {
895 return match harn_modules::personas::parse_persona_source_file(&manifest_path) {
896 Ok(document) if !document.personas.is_empty() => {
897 validate_and_resolve_standalone_personas(
898 document.personas,
899 manifest_path,
900 manifest_dir,
901 )
902 }
903 Ok(_) => Err(vec![PersonaValidationError {
904 manifest_path: manifest_path.clone(),
905 field_path: "persona".to_string(),
906 message: "no @persona declarations found".to_string(),
907 }]),
908 Err(message) => Err(vec![PersonaValidationError {
909 manifest_path: manifest_path.clone(),
910 field_path: "persona".to_string(),
911 message,
912 }]),
913 };
914 }
915 let manifest = match read_manifest_from_path(&manifest_path) {
916 Ok(manifest) => manifest,
917 Err(message) => {
918 if let Ok(document) =
919 harn_modules::personas::parse_persona_manifest_file(&manifest_path)
920 {
921 if !document.personas.is_empty() {
922 return validate_and_resolve_standalone_personas(
923 document.personas,
924 manifest_path,
925 manifest_dir,
926 );
927 }
928 }
929 return Err(vec![PersonaValidationError {
930 manifest_path: manifest_path.clone(),
931 field_path: "harn.toml".to_string(),
932 message: message.to_string(),
933 }]);
934 }
935 };
936 if manifest.personas.is_empty() {
937 if let Ok(document) = harn_modules::personas::parse_persona_manifest_file(&manifest_path) {
938 if !document.personas.is_empty() {
939 return validate_and_resolve_standalone_personas(
940 document.personas,
941 manifest_path,
942 manifest_dir,
943 );
944 }
945 }
946 }
947 validate_and_resolve_personas(manifest, manifest_path, manifest_dir)
948}
949
950fn validate_and_resolve_standalone_personas(
951 personas: Vec<PersonaManifestEntry>,
952 manifest_path: PathBuf,
953 manifest_dir: PathBuf,
954) -> Result<ResolvedPersonaManifest, Vec<PersonaValidationError>> {
955 let known_names = personas
956 .iter()
957 .filter_map(|persona| persona.name.as_ref())
958 .filter(|name| !name.trim().is_empty())
959 .cloned()
960 .collect();
961 let context = harn_modules::personas::PersonaValidationContext {
962 known_capabilities: harn_modules::personas::default_persona_capabilities(),
963 known_tools: BTreeSet::new(),
964 known_names,
965 };
966 harn_modules::personas::validate_persona_manifests(&manifest_path, &personas, &context)?;
967 Ok(ResolvedPersonaManifest {
968 manifest_path,
969 manifest_dir,
970 personas,
971 })
972}
973
974pub fn load_personas_config(
975 anchor: Option<&Path>,
976) -> Result<Option<ResolvedPersonaManifest>, Vec<PersonaValidationError>> {
977 let anchor = anchor
978 .map(Path::to_path_buf)
979 .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
980 let Some((manifest, dir)) = find_nearest_manifest(&anchor) else {
981 return Ok(None);
982 };
983 let manifest_path = dir.join(MANIFEST);
984 validate_and_resolve_personas(manifest, manifest_path, dir).map(Some)
985}
986
987pub(crate) fn validate_and_resolve_personas(
988 manifest: Manifest,
989 manifest_path: PathBuf,
990 manifest_dir: PathBuf,
991) -> Result<ResolvedPersonaManifest, Vec<PersonaValidationError>> {
992 let known_capabilities = known_persona_capabilities(&manifest, &manifest_dir);
993 let known_tools = known_persona_tools(&manifest);
994 let known_names: BTreeSet<String> = manifest
995 .personas
996 .iter()
997 .filter_map(|persona| persona.name.as_ref())
998 .filter(|name| !name.trim().is_empty())
999 .cloned()
1000 .collect();
1001 let context = harn_modules::personas::PersonaValidationContext {
1002 known_capabilities,
1003 known_tools,
1004 known_names,
1005 };
1006 if let Err(errors) = harn_modules::personas::validate_persona_manifests(
1007 &manifest_path,
1008 &manifest.personas,
1009 &context,
1010 ) {
1011 Err(errors)
1012 } else {
1013 let mut personas = manifest.personas;
1014 attach_entry_workflow_steps(&mut personas, &manifest_dir);
1015 Ok(ResolvedPersonaManifest {
1016 manifest_path,
1017 manifest_dir,
1018 personas,
1019 })
1020 }
1021}
1022
1023fn attach_entry_workflow_steps(personas: &mut [PersonaManifestEntry], manifest_dir: &Path) {
1024 for persona in personas {
1025 if !persona.steps.is_empty() {
1026 continue;
1027 }
1028 let Some(entry_workflow) = persona.entry_workflow.as_deref() else {
1029 continue;
1030 };
1031 let Some((path, entry_name)) = entry_workflow.split_once('#') else {
1032 continue;
1033 };
1034 if !path.ends_with(".harn") {
1035 continue;
1036 }
1037 let source_path = manifest_dir.join(path);
1038 let Ok(document) = harn_modules::personas::parse_persona_source_file(&source_path) else {
1039 continue;
1040 };
1041 let entry_name = entry_name.trim();
1042 if let Some(source_persona) = document.personas.iter().find(|candidate| {
1043 candidate.entry_workflow.as_deref() == Some(entry_name)
1044 || candidate.name.as_deref() == persona.name.as_deref()
1045 }) {
1046 persona.steps.clone_from(&source_persona.steps);
1047 }
1048 }
1049}
1050
1051pub(crate) fn known_persona_capabilities(
1052 manifest: &Manifest,
1053 manifest_dir: &Path,
1054) -> BTreeSet<String> {
1055 let mut capabilities = BTreeSet::new();
1056 for (capability, operations) in default_persona_capability_map() {
1057 for operation in operations {
1058 capabilities.insert(format!("{capability}.{operation}"));
1059 }
1060 }
1061 for (capability, operations) in &manifest.check.host_capabilities {
1062 for operation in operations {
1063 capabilities.insert(format!("{capability}.{operation}"));
1064 }
1065 }
1066 if let Some(path) = manifest.check.host_capabilities_path.as_deref() {
1067 let path = PathBuf::from(path);
1068 let path = if path.is_absolute() {
1069 path
1070 } else {
1071 manifest_dir.join(path)
1072 };
1073 if let Ok(content) = fs::read_to_string(path) {
1074 let parsed_json = serde_json::from_str::<serde_json::Value>(&content).ok();
1075 let parsed_toml = toml::from_str::<toml::Value>(&content)
1076 .ok()
1077 .and_then(|value| serde_json::to_value(value).ok());
1078 if let Some(value) = parsed_json.or(parsed_toml) {
1079 collect_persona_capabilities_from_json(&value, &mut capabilities);
1080 }
1081 }
1082 }
1083 capabilities
1084}
1085
1086pub(crate) fn collect_persona_capabilities_from_json(
1087 value: &serde_json::Value,
1088 out: &mut BTreeSet<String>,
1089) {
1090 let root = value.get("capabilities").unwrap_or(value);
1091 let Some(capabilities) = root.as_object() else {
1092 return;
1093 };
1094 for (capability, entry) in capabilities {
1095 if let Some(list) = entry.as_array() {
1096 for item in list {
1097 if let Some(operation) = item.as_str() {
1098 out.insert(format!("{capability}.{operation}"));
1099 }
1100 }
1101 } else if let Some(obj) = entry.as_object() {
1102 if let Some(list) = obj
1103 .get("operations")
1104 .or_else(|| obj.get("ops"))
1105 .and_then(|v| v.as_array())
1106 {
1107 for item in list {
1108 if let Some(operation) = item.as_str() {
1109 out.insert(format!("{capability}.{operation}"));
1110 }
1111 }
1112 } else {
1113 for (operation, enabled) in obj {
1114 if enabled.as_bool().unwrap_or(true) {
1115 out.insert(format!("{capability}.{operation}"));
1116 }
1117 }
1118 }
1119 }
1120 }
1121}
1122
1123pub(crate) fn default_persona_capability_map() -> BTreeMap<&'static str, Vec<&'static str>> {
1124 harn_modules::personas::default_persona_capability_map()
1125}
1126
1127pub(crate) fn known_persona_tools(manifest: &Manifest) -> BTreeSet<String> {
1128 let mut tools = BTreeSet::from([
1129 "a2a".to_string(),
1130 "acp".to_string(),
1131 "ci".to_string(),
1132 "filesystem".to_string(),
1133 "github".to_string(),
1134 "linear".to_string(),
1135 "mcp".to_string(),
1136 "notion".to_string(),
1137 "pagerduty".to_string(),
1138 "shell".to_string(),
1139 "slack".to_string(),
1140 ]);
1141 for server in &manifest.mcp {
1142 tools.insert(server.name.clone());
1143 }
1144 for provider in &manifest.providers {
1145 tools.insert(provider.id.as_str().to_string());
1146 }
1147 for trigger in &manifest.triggers {
1148 if let Some(provider) = trigger.provider.as_ref() {
1149 tools.insert(provider.as_str().to_string());
1150 }
1151 for source in &trigger.sources {
1152 tools.insert(source.provider.as_str().to_string());
1153 }
1154 }
1155 tools
1156}
1157
1158#[cfg(test)]
1159#[path = "extensions_tests.rs"]
1160mod tests;