Skip to main content

loong_kernel/
integration.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use serde::{Deserialize, Serialize};
4
5use crate::{contracts::Capability, errors::IntegrationError, pack::VerticalPackManifest};
6
7#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8pub struct ProviderConfig {
9    pub provider_id: String,
10    pub connector_name: String,
11    pub version: String,
12    pub metadata: BTreeMap<String, String>,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
16pub struct ChannelConfig {
17    pub channel_id: String,
18    pub provider_id: String,
19    pub endpoint: String,
20    pub enabled: bool,
21    pub metadata: BTreeMap<String, String>,
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct ProviderTemplate {
26    pub provider_id: String,
27    pub default_connector_name: String,
28    pub default_version: String,
29    pub metadata: BTreeMap<String, String>,
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
33pub struct IntegrationCatalog {
34    providers: BTreeMap<String, ProviderConfig>,
35    channels: BTreeMap<String, ChannelConfig>,
36    templates: BTreeMap<String, ProviderTemplate>,
37    revision: u64,
38}
39
40impl IntegrationCatalog {
41    #[must_use]
42    pub fn new() -> Self {
43        Self::default()
44    }
45
46    #[must_use]
47    pub fn revision(&self) -> u64 {
48        self.revision
49    }
50
51    pub fn register_template(&mut self, template: ProviderTemplate) {
52        self.templates
53            .insert(template.provider_id.clone(), template);
54    }
55
56    #[must_use]
57    pub fn template(&self, provider_id: &str) -> Option<&ProviderTemplate> {
58        self.templates.get(provider_id)
59    }
60
61    pub fn upsert_provider(&mut self, provider: ProviderConfig) {
62        self.providers
63            .insert(provider.provider_id.clone(), provider);
64        self.revision = self.revision.saturating_add(1);
65    }
66
67    pub fn upsert_channel(&mut self, channel: ChannelConfig) {
68        self.channels.insert(channel.channel_id.clone(), channel);
69        self.revision = self.revision.saturating_add(1);
70    }
71
72    #[must_use]
73    pub fn provider(&self, provider_id: &str) -> Option<&ProviderConfig> {
74        self.providers.get(provider_id)
75    }
76
77    #[must_use]
78    pub fn channel(&self, channel_id: &str) -> Option<&ChannelConfig> {
79        self.channels.get(channel_id)
80    }
81
82    #[must_use]
83    pub fn providers(&self) -> Vec<ProviderConfig> {
84        self.providers.values().cloned().collect()
85    }
86
87    #[must_use]
88    pub fn channels_for_provider(&self, provider_id: &str) -> Vec<ChannelConfig> {
89        self.channels
90            .values()
91            .filter(|channel| channel.provider_id == provider_id)
92            .cloned()
93            .collect()
94    }
95
96    pub fn apply_plan(
97        &mut self,
98        pack: &mut VerticalPackManifest,
99        plan: &ProvisionPlan,
100    ) -> Result<(), IntegrationError> {
101        for action in &plan.actions {
102            match action {
103                ProvisionAction::AddProvider { provider, .. }
104                | ProvisionAction::PatchProvider { provider, .. } => {
105                    self.upsert_provider(provider.clone());
106                }
107                ProvisionAction::AddChannel { channel, .. }
108                | ProvisionAction::PatchChannel { channel, .. } => {
109                    self.upsert_channel(channel.clone());
110                }
111            }
112        }
113
114        for connector in &plan.pack_connector_additions {
115            pack.allowed_connectors.insert(connector.clone());
116        }
117        for capability in &plan.pack_capability_additions {
118            pack.granted_capabilities.insert(*capability);
119        }
120
121        Ok(())
122    }
123
124    pub fn apply_hotfix(&mut self, hotfix: &IntegrationHotfix) -> Result<(), IntegrationError> {
125        match hotfix {
126            IntegrationHotfix::ProviderVersion {
127                provider_id,
128                new_version,
129            } => {
130                let provider = self
131                    .providers
132                    .get_mut(provider_id)
133                    .ok_or_else(|| IntegrationError::ProviderNotFound(provider_id.clone()))?;
134                provider.version = new_version.clone();
135                self.revision = self.revision.saturating_add(1);
136            }
137            IntegrationHotfix::ProviderConnector {
138                provider_id,
139                new_connector_name,
140            } => {
141                let provider = self
142                    .providers
143                    .get_mut(provider_id)
144                    .ok_or_else(|| IntegrationError::ProviderNotFound(provider_id.clone()))?;
145                provider.connector_name = new_connector_name.clone();
146                self.revision = self.revision.saturating_add(1);
147            }
148            IntegrationHotfix::ChannelEndpoint {
149                channel_id,
150                new_endpoint,
151            } => {
152                let channel = self
153                    .channels
154                    .get_mut(channel_id)
155                    .ok_or_else(|| IntegrationError::ChannelNotFound(channel_id.clone()))?;
156                channel.endpoint = new_endpoint.clone();
157                self.revision = self.revision.saturating_add(1);
158            }
159            IntegrationHotfix::ChannelEnabled {
160                channel_id,
161                enabled,
162            } => {
163                let channel = self
164                    .channels
165                    .get_mut(channel_id)
166                    .ok_or_else(|| IntegrationError::ChannelNotFound(channel_id.clone()))?;
167                channel.enabled = *enabled;
168                self.revision = self.revision.saturating_add(1);
169            }
170        }
171        Ok(())
172    }
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
176pub struct AutoProvisionRequest {
177    pub provider_id: String,
178    pub channel_id: String,
179    pub connector_name: Option<String>,
180    pub endpoint: Option<String>,
181    pub required_capabilities: BTreeSet<Capability>,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
185pub enum ProvisionAction {
186    AddProvider {
187        provider: ProviderConfig,
188        reason: String,
189    },
190    PatchProvider {
191        provider: ProviderConfig,
192        reason: String,
193    },
194    AddChannel {
195        channel: ChannelConfig,
196        reason: String,
197    },
198    PatchChannel {
199        channel: ChannelConfig,
200        reason: String,
201    },
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
205pub struct ProvisionPlan {
206    pub actions: Vec<ProvisionAction>,
207    pub pack_connector_additions: BTreeSet<String>,
208    pub pack_capability_additions: BTreeSet<Capability>,
209}
210
211impl ProvisionPlan {
212    #[must_use]
213    pub fn is_noop(&self) -> bool {
214        self.actions.is_empty()
215            && self.pack_connector_additions.is_empty()
216            && self.pack_capability_additions.is_empty()
217    }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
221pub enum IntegrationHotfix {
222    ProviderVersion {
223        provider_id: String,
224        new_version: String,
225    },
226    ProviderConnector {
227        provider_id: String,
228        new_connector_name: String,
229    },
230    ChannelEndpoint {
231        channel_id: String,
232        new_endpoint: String,
233    },
234    ChannelEnabled {
235        channel_id: String,
236        enabled: bool,
237    },
238}
239
240#[derive(Debug, Default)]
241pub struct AutoProvisionAgent;
242
243impl AutoProvisionAgent {
244    #[must_use]
245    pub fn new() -> Self {
246        Self
247    }
248
249    pub fn plan(
250        &self,
251        catalog: &IntegrationCatalog,
252        pack: &VerticalPackManifest,
253        request: &AutoProvisionRequest,
254    ) -> Result<ProvisionPlan, IntegrationError> {
255        let mut plan = ProvisionPlan::default();
256
257        let provider = match catalog.provider(&request.provider_id) {
258            Some(existing) => {
259                if let Some(expected_connector) = request.connector_name.as_deref() {
260                    if existing.connector_name != expected_connector {
261                        let mut patched = existing.clone();
262                        patched.connector_name = expected_connector.to_owned();
263                        plan.actions.push(ProvisionAction::PatchProvider {
264                            provider: patched.clone(),
265                            reason: "connector override required by request".to_owned(),
266                        });
267                        patched
268                    } else {
269                        existing.clone()
270                    }
271                } else {
272                    existing.clone()
273                }
274            }
275            None => {
276                let new_provider = self.new_provider_from_request(catalog, request);
277                plan.actions.push(ProvisionAction::AddProvider {
278                    provider: new_provider.clone(),
279                    reason: "provider missing and generated from template".to_owned(),
280                });
281                new_provider
282            }
283        };
284
285        let channel = match catalog.channel(&request.channel_id) {
286            Some(existing) => {
287                let mut patched = existing.clone();
288                let mut changed = false;
289                if patched.provider_id != provider.provider_id {
290                    patched.provider_id = provider.provider_id.clone();
291                    changed = true;
292                }
293                if !patched.enabled {
294                    patched.enabled = true;
295                    changed = true;
296                }
297                if let Some(endpoint) = request.endpoint.as_deref()
298                    && patched.endpoint != endpoint
299                {
300                    patched.endpoint = endpoint.to_owned();
301                    changed = true;
302                }
303
304                if changed {
305                    plan.actions.push(ProvisionAction::PatchChannel {
306                        channel: patched.clone(),
307                        reason: "channel requires repair for provider binding or endpoint"
308                            .to_owned(),
309                    });
310                    patched
311                } else {
312                    existing.clone()
313                }
314            }
315            None => {
316                let new_channel = ChannelConfig {
317                    channel_id: request.channel_id.clone(),
318                    provider_id: provider.provider_id.clone(),
319                    endpoint: request.endpoint.clone().unwrap_or_else(|| {
320                        format!(
321                            "https://{}.local/{}/invoke",
322                            provider.provider_id, request.channel_id
323                        )
324                    }),
325                    enabled: true,
326                    metadata: BTreeMap::new(),
327                };
328                plan.actions.push(ProvisionAction::AddChannel {
329                    channel: new_channel.clone(),
330                    reason: "channel missing and generated from provider defaults".to_owned(),
331                });
332                new_channel
333            }
334        };
335
336        let connector_name = provider.connector_name;
337        if !pack.allowed_connectors.contains(&connector_name) {
338            plan.pack_connector_additions.insert(connector_name);
339        }
340
341        if !pack
342            .granted_capabilities
343            .contains(&Capability::InvokeConnector)
344        {
345            plan.pack_capability_additions
346                .insert(Capability::InvokeConnector);
347        }
348        for capability in &request.required_capabilities {
349            if !pack.granted_capabilities.contains(capability) {
350                plan.pack_capability_additions.insert(*capability);
351            }
352        }
353
354        if !channel.enabled {
355            return Err(IntegrationError::ChannelDisabled(channel.channel_id));
356        }
357
358        Ok(plan)
359    }
360
361    fn new_provider_from_request(
362        &self,
363        catalog: &IntegrationCatalog,
364        request: &AutoProvisionRequest,
365    ) -> ProviderConfig {
366        if let Some(template) = catalog.template(&request.provider_id) {
367            ProviderConfig {
368                provider_id: template.provider_id.clone(),
369                connector_name: request
370                    .connector_name
371                    .clone()
372                    .unwrap_or_else(|| template.default_connector_name.clone()),
373                version: template.default_version.clone(),
374                metadata: template.metadata.clone(),
375            }
376        } else {
377            ProviderConfig {
378                provider_id: request.provider_id.clone(),
379                connector_name: request
380                    .connector_name
381                    .clone()
382                    .unwrap_or_else(|| format!("{}-connector", request.provider_id)),
383                version: "0.1.0".to_owned(),
384                metadata: BTreeMap::from([("source".to_owned(), "auto-generated".to_owned())]),
385            }
386        }
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::{
394        contracts::{ExecutionRoute, HarnessKind},
395        pack::VerticalPackManifest,
396    };
397
398    fn sample_pack() -> VerticalPackManifest {
399        VerticalPackManifest {
400            pack_id: "sample-pack".to_owned(),
401            domain: "engineering".to_owned(),
402            version: "0.1.0".to_owned(),
403            default_route: ExecutionRoute {
404                harness_kind: HarnessKind::EmbeddedPi,
405                adapter: Some("pi-local".to_owned()),
406            },
407            allowed_connectors: BTreeSet::new(),
408            granted_capabilities: BTreeSet::new(),
409            metadata: BTreeMap::new(),
410        }
411    }
412
413    #[test]
414    fn agent_plans_missing_provider_and_channel_then_pack_is_extended() {
415        let agent = AutoProvisionAgent::new();
416        let mut catalog = IntegrationCatalog::new();
417        catalog.register_template(ProviderTemplate {
418            provider_id: "openai".to_owned(),
419            default_connector_name: "openai".to_owned(),
420            default_version: "1.0.0".to_owned(),
421            metadata: BTreeMap::from([("class".to_owned(), "llm".to_owned())]),
422        });
423        let mut pack = sample_pack();
424
425        let request = AutoProvisionRequest {
426            provider_id: "openai".to_owned(),
427            channel_id: "chat-main".to_owned(),
428            connector_name: None,
429            endpoint: Some("https://api.openai.com/v1/chat/completions".to_owned()),
430            required_capabilities: BTreeSet::from([
431                Capability::InvokeTool,
432                Capability::ObserveTelemetry,
433            ]),
434        };
435        let plan = agent
436            .plan(&catalog, &pack, &request)
437            .expect("plan should succeed");
438        assert!(!plan.is_noop());
439        assert!(plan.pack_connector_additions.contains("openai"));
440        assert!(
441            plan.pack_capability_additions
442                .contains(&Capability::InvokeConnector)
443        );
444        assert!(
445            plan.pack_capability_additions
446                .contains(&Capability::ObserveTelemetry)
447        );
448
449        catalog
450            .apply_plan(&mut pack, &plan)
451            .expect("apply plan should succeed");
452
453        assert!(catalog.provider("openai").is_some());
454        assert!(catalog.channel("chat-main").is_some());
455        assert!(pack.allowed_connectors.contains("openai"));
456        assert!(
457            pack.granted_capabilities
458                .contains(&Capability::InvokeConnector)
459        );
460        assert!(
461            pack.granted_capabilities
462                .contains(&Capability::ObserveTelemetry)
463        );
464    }
465
466    #[test]
467    fn hotfix_can_patch_channel_endpoint_without_reboot() {
468        let mut catalog = IntegrationCatalog::new();
469        catalog.upsert_provider(ProviderConfig {
470            provider_id: "slack".to_owned(),
471            connector_name: "slack".to_owned(),
472            version: "1.0.0".to_owned(),
473            metadata: BTreeMap::new(),
474        });
475        catalog.upsert_channel(ChannelConfig {
476            channel_id: "alerts".to_owned(),
477            provider_id: "slack".to_owned(),
478            endpoint: "https://old.example/alerts".to_owned(),
479            enabled: true,
480            metadata: BTreeMap::new(),
481        });
482
483        catalog
484            .apply_hotfix(&IntegrationHotfix::ChannelEndpoint {
485                channel_id: "alerts".to_owned(),
486                new_endpoint: "https://new.example/alerts".to_owned(),
487            })
488            .expect("hotfix should succeed");
489
490        let channel = catalog.channel("alerts").expect("channel should exist");
491        assert_eq!(channel.endpoint, "https://new.example/alerts");
492        assert!(catalog.revision() >= 3);
493    }
494
495    #[test]
496    fn planner_repairs_disabled_channel() {
497        let agent = AutoProvisionAgent::new();
498        let mut catalog = IntegrationCatalog::new();
499        catalog.upsert_provider(ProviderConfig {
500            provider_id: "github".to_owned(),
501            connector_name: "github".to_owned(),
502            version: "1.0.0".to_owned(),
503            metadata: BTreeMap::new(),
504        });
505        catalog.upsert_channel(ChannelConfig {
506            channel_id: "webhooks".to_owned(),
507            provider_id: "github".to_owned(),
508            endpoint: "https://api.github.com/webhooks".to_owned(),
509            enabled: false,
510            metadata: BTreeMap::new(),
511        });
512
513        let plan = agent
514            .plan(
515                &catalog,
516                &sample_pack(),
517                &AutoProvisionRequest {
518                    provider_id: "github".to_owned(),
519                    channel_id: "webhooks".to_owned(),
520                    connector_name: None,
521                    endpoint: None,
522                    required_capabilities: BTreeSet::new(),
523                },
524            )
525            .expect("plan should succeed");
526
527        assert!(
528            plan.actions
529                .iter()
530                .any(|action| matches!(action, ProvisionAction::PatchChannel { .. }))
531        );
532    }
533}