loong_kernel/
connector.rs1use std::{any::Any, collections::BTreeMap, panic::AssertUnwindSafe, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::FutureExt;
5
6use crate::{
7 contracts::{ConnectorCommand, ConnectorOutcome},
8 errors::ConnectorError,
9};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ConnectorTier {
13 Core,
14 Extension,
15}
16
17impl ConnectorTier {
18 const fn as_error_scope(self) -> &'static str {
19 match self {
20 Self::Core => "connector core adapter",
21 Self::Extension => "connector extension adapter",
22 }
23 }
24}
25
26#[async_trait]
27pub trait CoreConnectorAdapter: Send + Sync {
28 fn name(&self) -> &str;
29 async fn invoke_core(
30 &self,
31 command: ConnectorCommand,
32 ) -> Result<ConnectorOutcome, ConnectorError>;
33}
34
35#[async_trait]
36pub trait ConnectorExtensionAdapter: Send + Sync {
37 fn name(&self) -> &str;
38 async fn invoke_extension(
39 &self,
40 command: ConnectorCommand,
41 core: &(dyn CoreConnectorAdapter + Sync),
42 ) -> Result<ConnectorOutcome, ConnectorError>;
43}
44
45#[derive(Default)]
46pub struct ConnectorPlane {
47 core_adapters: BTreeMap<String, Arc<dyn CoreConnectorAdapter>>,
48 extension_adapters: BTreeMap<String, Arc<dyn ConnectorExtensionAdapter>>,
49 default_core_adapter: Option<String>,
50}
51
52struct PanicIsolatedCoreConnector {
53 adapter_name: String,
54 adapter: Arc<dyn CoreConnectorAdapter>,
55}
56
57impl PanicIsolatedCoreConnector {
58 fn new(adapter_name: String, adapter: Arc<dyn CoreConnectorAdapter>) -> Self {
59 Self {
60 adapter_name,
61 adapter,
62 }
63 }
64}
65
66#[async_trait]
67impl CoreConnectorAdapter for PanicIsolatedCoreConnector {
68 fn name(&self) -> &str {
69 &self.adapter_name
70 }
71
72 async fn invoke_core(
73 &self,
74 command: ConnectorCommand,
75 ) -> Result<ConnectorOutcome, ConnectorError> {
76 let invocation = self.adapter.invoke_core(command);
77 return execute_connector_invocation(&self.adapter_name, ConnectorTier::Core, invocation)
78 .await;
79 }
80}
81
82impl ConnectorPlane {
83 #[must_use]
84 pub fn new() -> Self {
85 Self {
86 core_adapters: BTreeMap::new(),
87 extension_adapters: BTreeMap::new(),
88 default_core_adapter: None,
89 }
90 }
91
92 pub fn register_core_adapter<A: CoreConnectorAdapter + 'static>(&mut self, adapter: A) {
93 let name = adapter.name().to_owned();
94 if self.default_core_adapter.is_none() {
95 self.default_core_adapter = Some(name.clone());
96 }
97 self.core_adapters.insert(name, Arc::new(adapter));
98 }
99
100 pub fn register_extension_adapter<A: ConnectorExtensionAdapter + 'static>(
101 &mut self,
102 adapter: A,
103 ) {
104 let name = adapter.name().to_owned();
105 self.extension_adapters.insert(name, Arc::new(adapter));
106 }
107
108 pub fn set_default_core_adapter(&mut self, name: &str) -> Result<(), ConnectorError> {
109 if !self.core_adapters.contains_key(name) {
110 return Err(ConnectorError::CoreAdapterNotFound(name.to_owned()));
111 }
112 self.default_core_adapter = Some(name.to_owned());
113 Ok(())
114 }
115
116 #[must_use]
117 pub fn default_core_adapter_name(&self) -> Option<&str> {
118 self.default_core_adapter.as_deref()
119 }
120
121 pub async fn invoke_core(
122 &self,
123 core_name: Option<&str>,
124 command: ConnectorCommand,
125 ) -> Result<ConnectorOutcome, ConnectorError> {
126 let resolved_name = if let Some(name) = core_name {
127 name
128 } else {
129 self.default_core_adapter
130 .as_deref()
131 .ok_or(ConnectorError::NoDefaultCoreAdapter)?
132 };
133
134 let core = self
135 .core_adapters
136 .get(resolved_name)
137 .ok_or_else(|| ConnectorError::CoreAdapterNotFound(resolved_name.to_owned()))?
138 .clone();
139
140 let invocation = core.invoke_core(command);
141 return execute_connector_invocation(resolved_name, ConnectorTier::Core, invocation).await;
142 }
143
144 pub async fn invoke_extension(
145 &self,
146 extension_name: &str,
147 core_name: Option<&str>,
148 command: ConnectorCommand,
149 ) -> Result<ConnectorOutcome, ConnectorError> {
150 let extension = self
151 .extension_adapters
152 .get(extension_name)
153 .ok_or_else(|| ConnectorError::ExtensionNotFound(extension_name.to_owned()))?
154 .clone();
155
156 let resolved_core_name = if let Some(name) = core_name {
157 name
158 } else {
159 self.default_core_adapter
160 .as_deref()
161 .ok_or(ConnectorError::NoDefaultCoreAdapter)?
162 };
163
164 let core = self
165 .core_adapters
166 .get(resolved_core_name)
167 .ok_or_else(|| ConnectorError::CoreAdapterNotFound(resolved_core_name.to_owned()))?
168 .clone();
169
170 let guarded_core = PanicIsolatedCoreConnector::new(resolved_core_name.to_owned(), core);
171 let invocation = extension.invoke_extension(command, &guarded_core);
172 return execute_connector_invocation(extension_name, ConnectorTier::Extension, invocation)
173 .await;
174 }
175}
176
177async fn execute_connector_invocation<F>(
178 adapter_name: &str,
179 tier: ConnectorTier,
180 invocation: F,
181) -> Result<ConnectorOutcome, ConnectorError>
182where
183 F: std::future::Future<Output = Result<ConnectorOutcome, ConnectorError>>,
184{
185 let guarded_invocation = AssertUnwindSafe(invocation);
186 let panic_result = guarded_invocation.catch_unwind().await;
187
188 match panic_result {
189 Ok(outcome) => outcome,
190 Err(panic_payload) => {
191 let panic_message =
192 format_connector_invocation_panic(adapter_name, tier, panic_payload);
193 Err(ConnectorError::Execution(panic_message))
194 }
195 }
196}
197
198fn format_connector_invocation_panic(
199 adapter_name: &str,
200 tier: ConnectorTier,
201 panic_payload: Box<dyn Any + Send>,
202) -> String {
203 let panic_message = extract_connector_panic_message(panic_payload);
204 let scope = tier.as_error_scope();
205
206 match panic_message {
207 Some(message) => format!("{scope} `{adapter_name}` panicked: {message}"),
208 None => format!("{scope} `{adapter_name}` panicked"),
209 }
210}
211
212fn extract_connector_panic_message(panic_payload: Box<dyn Any + Send>) -> Option<String> {
213 let panic_payload = match panic_payload.downcast::<String>() {
214 Ok(message) => return Some(*message),
215 Err(panic_payload) => panic_payload,
216 };
217
218 match panic_payload.downcast::<&'static str>() {
219 Ok(message) => Some((*message).to_owned()),
220 Err(_) => None,
221 }
222}