Skip to main content

loong_kernel/
connector.rs

1use std::{any::Any, collections::BTreeMap, panic::AssertUnwindSafe, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::FutureExt;
5
6use crate::{
7    contracts::{ConnectorCommand, ConnectorOutcome},
8    errors::ConnectorError,
9};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ConnectorTier {
13    Core,
14    Extension,
15}
16
17impl ConnectorTier {
18    const fn as_error_scope(self) -> &'static str {
19        match self {
20            Self::Core => "connector core adapter",
21            Self::Extension => "connector extension adapter",
22        }
23    }
24}
25
26#[async_trait]
27pub trait CoreConnectorAdapter: Send + Sync {
28    fn name(&self) -> &str;
29    async fn invoke_core(
30        &self,
31        command: ConnectorCommand,
32    ) -> Result<ConnectorOutcome, ConnectorError>;
33}
34
35#[async_trait]
36pub trait ConnectorExtensionAdapter: Send + Sync {
37    fn name(&self) -> &str;
38    async fn invoke_extension(
39        &self,
40        command: ConnectorCommand,
41        core: &(dyn CoreConnectorAdapter + Sync),
42    ) -> Result<ConnectorOutcome, ConnectorError>;
43}
44
45#[derive(Default)]
46pub struct ConnectorPlane {
47    core_adapters: BTreeMap<String, Arc<dyn CoreConnectorAdapter>>,
48    extension_adapters: BTreeMap<String, Arc<dyn ConnectorExtensionAdapter>>,
49    default_core_adapter: Option<String>,
50}
51
52struct PanicIsolatedCoreConnector {
53    adapter_name: String,
54    adapter: Arc<dyn CoreConnectorAdapter>,
55}
56
57impl PanicIsolatedCoreConnector {
58    fn new(adapter_name: String, adapter: Arc<dyn CoreConnectorAdapter>) -> Self {
59        Self {
60            adapter_name,
61            adapter,
62        }
63    }
64}
65
66#[async_trait]
67impl CoreConnectorAdapter for PanicIsolatedCoreConnector {
68    fn name(&self) -> &str {
69        &self.adapter_name
70    }
71
72    async fn invoke_core(
73        &self,
74        command: ConnectorCommand,
75    ) -> Result<ConnectorOutcome, ConnectorError> {
76        let invocation = self.adapter.invoke_core(command);
77        return execute_connector_invocation(&self.adapter_name, ConnectorTier::Core, invocation)
78            .await;
79    }
80}
81
82impl ConnectorPlane {
83    #[must_use]
84    pub fn new() -> Self {
85        Self {
86            core_adapters: BTreeMap::new(),
87            extension_adapters: BTreeMap::new(),
88            default_core_adapter: None,
89        }
90    }
91
92    pub fn register_core_adapter<A: CoreConnectorAdapter + 'static>(&mut self, adapter: A) {
93        let name = adapter.name().to_owned();
94        if self.default_core_adapter.is_none() {
95            self.default_core_adapter = Some(name.clone());
96        }
97        self.core_adapters.insert(name, Arc::new(adapter));
98    }
99
100    pub fn register_extension_adapter<A: ConnectorExtensionAdapter + 'static>(
101        &mut self,
102        adapter: A,
103    ) {
104        let name = adapter.name().to_owned();
105        self.extension_adapters.insert(name, Arc::new(adapter));
106    }
107
108    pub fn set_default_core_adapter(&mut self, name: &str) -> Result<(), ConnectorError> {
109        if !self.core_adapters.contains_key(name) {
110            return Err(ConnectorError::CoreAdapterNotFound(name.to_owned()));
111        }
112        self.default_core_adapter = Some(name.to_owned());
113        Ok(())
114    }
115
116    #[must_use]
117    pub fn default_core_adapter_name(&self) -> Option<&str> {
118        self.default_core_adapter.as_deref()
119    }
120
121    pub async fn invoke_core(
122        &self,
123        core_name: Option<&str>,
124        command: ConnectorCommand,
125    ) -> Result<ConnectorOutcome, ConnectorError> {
126        let resolved_name = if let Some(name) = core_name {
127            name
128        } else {
129            self.default_core_adapter
130                .as_deref()
131                .ok_or(ConnectorError::NoDefaultCoreAdapter)?
132        };
133
134        let core = self
135            .core_adapters
136            .get(resolved_name)
137            .ok_or_else(|| ConnectorError::CoreAdapterNotFound(resolved_name.to_owned()))?
138            .clone();
139
140        let invocation = core.invoke_core(command);
141        return execute_connector_invocation(resolved_name, ConnectorTier::Core, invocation).await;
142    }
143
144    pub async fn invoke_extension(
145        &self,
146        extension_name: &str,
147        core_name: Option<&str>,
148        command: ConnectorCommand,
149    ) -> Result<ConnectorOutcome, ConnectorError> {
150        let extension = self
151            .extension_adapters
152            .get(extension_name)
153            .ok_or_else(|| ConnectorError::ExtensionNotFound(extension_name.to_owned()))?
154            .clone();
155
156        let resolved_core_name = if let Some(name) = core_name {
157            name
158        } else {
159            self.default_core_adapter
160                .as_deref()
161                .ok_or(ConnectorError::NoDefaultCoreAdapter)?
162        };
163
164        let core = self
165            .core_adapters
166            .get(resolved_core_name)
167            .ok_or_else(|| ConnectorError::CoreAdapterNotFound(resolved_core_name.to_owned()))?
168            .clone();
169
170        let guarded_core = PanicIsolatedCoreConnector::new(resolved_core_name.to_owned(), core);
171        let invocation = extension.invoke_extension(command, &guarded_core);
172        return execute_connector_invocation(extension_name, ConnectorTier::Extension, invocation)
173            .await;
174    }
175}
176
177async fn execute_connector_invocation<F>(
178    adapter_name: &str,
179    tier: ConnectorTier,
180    invocation: F,
181) -> Result<ConnectorOutcome, ConnectorError>
182where
183    F: std::future::Future<Output = Result<ConnectorOutcome, ConnectorError>>,
184{
185    let guarded_invocation = AssertUnwindSafe(invocation);
186    let panic_result = guarded_invocation.catch_unwind().await;
187
188    match panic_result {
189        Ok(outcome) => outcome,
190        Err(panic_payload) => {
191            let panic_message =
192                format_connector_invocation_panic(adapter_name, tier, panic_payload);
193            Err(ConnectorError::Execution(panic_message))
194        }
195    }
196}
197
198fn format_connector_invocation_panic(
199    adapter_name: &str,
200    tier: ConnectorTier,
201    panic_payload: Box<dyn Any + Send>,
202) -> String {
203    let panic_message = extract_connector_panic_message(panic_payload);
204    let scope = tier.as_error_scope();
205
206    match panic_message {
207        Some(message) => format!("{scope} `{adapter_name}` panicked: {message}"),
208        None => format!("{scope} `{adapter_name}` panicked"),
209    }
210}
211
212fn extract_connector_panic_message(panic_payload: Box<dyn Any + Send>) -> Option<String> {
213    let panic_payload = match panic_payload.downcast::<String>() {
214        Ok(message) => return Some(*message),
215        Err(panic_payload) => panic_payload,
216    };
217
218    match panic_payload.downcast::<&'static str>() {
219        Ok(message) => Some((*message).to_owned()),
220        Err(_) => None,
221    }
222}