feagi_agent/sdk/base/
topology.rs1use std::collections::HashMap;
4use std::sync::Arc;
5
6use std::sync::RwLock;
7
8use crate::core::SdkError;
9use crate::sdk::types::CorticalID;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub struct CorticalTopology {
14 pub width: u32,
16 pub height: u32,
18 pub depth: u32,
20 pub channels: u32,
22}
23
24#[derive(Debug, Clone)]
26pub struct TopologyCache {
27 host: String,
28 port: u16,
29 #[allow(dead_code)]
30 timeout: std::time::Duration,
31 cache: Arc<RwLock<HashMap<CorticalID, CorticalTopology>>>,
32 #[cfg(feature = "sdk-io")]
33 client: reqwest::Client,
34}
35
36impl TopologyCache {
37 pub fn new(host: impl Into<String>, port: u16, timeout_s: f64) -> Result<Self, SdkError> {
39 let host = host.into();
40 let timeout = std::time::Duration::from_secs_f64(timeout_s);
41 #[cfg(feature = "sdk-io")]
42 let client = reqwest::Client::builder()
43 .timeout(timeout)
44 .build()
45 .map_err(|e| SdkError::Other(format!("TopologyCache HTTP client init failed: {e}")))?;
46 Ok(Self {
47 host,
48 port,
49 timeout,
50 cache: Arc::new(RwLock::new(HashMap::new())),
51 #[cfg(feature = "sdk-io")]
52 client,
53 })
54 }
55
56 #[cfg(feature = "sdk-io")]
58 pub async fn get_topology(&self, id: &CorticalID) -> Result<CorticalTopology, SdkError> {
59 if let Ok(cache) = self.cache.read() {
60 if let Some(existing) = cache.get(id).copied() {
61 return Ok(existing);
62 }
63 }
64 let payload = self.fetch_topologies(&[id.as_base_64()]).await?;
65 let topo = Self::parse_topology_payload(id, &payload)?;
66 if let Ok(mut cache) = self.cache.write() {
67 cache.insert(*id, topo);
68 }
69 Ok(topo)
70 }
71
72 #[cfg(feature = "sdk-io")]
74 pub async fn get_topologies(
75 &self,
76 ids: &[CorticalID],
77 ) -> Result<Vec<CorticalTopology>, SdkError> {
78 let missing: Vec<String> = {
79 let cache = self
80 .cache
81 .read()
82 .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
83 ids.iter()
84 .filter(|id| !cache.contains_key(*id))
85 .map(|id| id.as_base_64())
86 .collect()
87 };
88
89 if !missing.is_empty() {
90 let payload = self.fetch_topologies(&missing).await?;
91 let mut cache = self
92 .cache
93 .write()
94 .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
95 for id in ids {
96 if cache.contains_key(id) {
97 continue;
98 }
99 let topo = Self::parse_topology_payload(id, &payload)?;
100 cache.insert(*id, topo);
101 }
102 }
103
104 let cache = self
105 .cache
106 .read()
107 .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
108 ids.iter()
109 .map(|id| {
110 cache
111 .get(id)
112 .copied()
113 .ok_or_else(|| SdkError::Other(format!("Topology missing in cache: {}", id)))
114 })
115 .collect()
116 }
117
118 #[cfg(feature = "sdk-io")]
120 pub async fn prefetch(&self, ids: &[CorticalID]) -> Result<(), SdkError> {
121 self.get_topologies(ids).await.map(|_| ())
122 }
123
124 pub fn clear_cache(&self) {
126 if let Ok(mut cache) = self.cache.write() {
127 cache.clear();
128 }
129 }
130
131 pub fn cache_size(&self) -> usize {
133 self.cache.read().map(|c| c.len()).unwrap_or(0)
134 }
135
136 pub fn parse_topology_payload(
138 id: &CorticalID,
139 payload: &serde_json::Value,
140 ) -> Result<CorticalTopology, SdkError> {
141 let key = id.as_base_64();
142 let entry = payload
143 .get(&key)
144 .ok_or_else(|| SdkError::Other(format!("Topology payload missing key: {key}")))?;
145 let entry_obj = entry.as_object().ok_or_else(|| {
146 SdkError::Other(format!("Topology entry is not an object for key: {key}"))
147 })?;
148
149 let (dims, channels) = Self::parse_dimensions(entry_obj).ok_or_else(|| {
150 SdkError::Other(format!("Topology dimensions missing for key: {key}"))
151 })?;
152
153 Ok(CorticalTopology {
154 width: dims.0,
155 height: dims.1,
156 depth: dims.2,
157 channels,
158 })
159 }
160
161 #[cfg(feature = "sdk-io")]
162 async fn fetch_topologies(
163 &self,
164 cortical_ids: &[String],
165 ) -> Result<serde_json::Value, SdkError> {
166 let url = format!(
167 "http://{}:{}/v1/cortical_area/multi/cortical_area_properties",
168 self.host, self.port
169 );
170 let response = self
171 .client
172 .post(url)
173 .json(&serde_json::Value::Array(
174 cortical_ids.iter().map(|id| id.clone().into()).collect(),
175 ))
176 .send()
177 .await
178 .map_err(|e| SdkError::Other(format!("Topology request failed: {e}")))?;
179
180 let response = response
181 .error_for_status()
182 .map_err(|e| SdkError::Other(format!("Topology response error: {e}")))?;
183
184 response
185 .json::<serde_json::Value>()
186 .await
187 .map_err(|e| SdkError::Other(format!("Topology response parse failed: {e}")))
188 }
189
190 fn parse_dimensions(
191 entry_obj: &serde_json::Map<String, serde_json::Value>,
192 ) -> Option<((u32, u32, u32), u32)> {
193 let properties = entry_obj
194 .get("properties")
195 .and_then(|value| value.as_object());
196
197 let per_device_dims = entry_obj
198 .get("cortical_dimensions_per_device")
199 .and_then(Self::parse_dim_array)
200 .or_else(|| {
201 properties
202 .and_then(|props| props.get("cortical_dimensions_per_device"))
203 .and_then(Self::parse_dim_array)
204 });
205
206 let dev_count = entry_obj
207 .get("dev_count")
208 .and_then(|value| value.as_u64())
209 .map(|value| value as u32)
210 .or_else(|| {
211 properties
212 .and_then(|props| props.get("dev_count"))
213 .and_then(|value| value.as_u64())
214 .map(|value| value as u32)
215 });
216
217 let total_dims = entry_obj
218 .get("cortical_dimensions")
219 .or_else(|| entry_obj.get("dimensions"))
220 .or_else(|| {
221 properties
222 .and_then(|props| props.get("cortical_dimensions"))
223 .or_else(|| properties.and_then(|props| props.get("dimensions")))
224 })
225 .and_then(|dim_val| {
226 if dim_val.is_array() {
227 Self::parse_dim_array(dim_val)
228 } else if dim_val.is_object() {
229 Self::parse_dim_object(dim_val)
230 } else {
231 None
232 }
233 });
234
235 if let (Some(dims), Some(channels)) = (per_device_dims, dev_count) {
236 return Some((dims, channels));
237 }
238
239 if let (Some(dims), Some(total_dims)) = (per_device_dims, total_dims) {
240 if dims.0 > 0 && total_dims.0 % dims.0 == 0 {
241 let channels = total_dims.0 / dims.0;
242 if channels > 0 {
243 return Some((dims, channels));
244 }
245 }
246 }
247
248 if let (Some(total_dims), Some(channels)) = (total_dims, dev_count) {
249 if channels > 0 && total_dims.0 % channels == 0 {
250 let per_device = (total_dims.0 / channels, total_dims.1, total_dims.2);
251 return Some((per_device, channels));
252 }
253 }
254
255 if let Some(total_dims) = total_dims {
256 return Some((total_dims, 1));
257 }
258
259 None
261 }
262
263 fn parse_dim_array(value: &serde_json::Value) -> Option<(u32, u32, u32)> {
264 let arr = value.as_array()?;
265 if arr.len() != 3 {
266 return None;
267 }
268 let w = arr[0].as_u64()? as u32;
269 let h = arr[1].as_u64()? as u32;
270 let d = arr[2].as_u64()? as u32;
271 Some((w, h, d))
272 }
273
274 fn parse_dim_object(value: &serde_json::Value) -> Option<(u32, u32, u32)> {
275 let obj = value.as_object()?;
276 let w = obj.get("x")?.as_u64()? as u32;
277 let h = obj.get("y")?.as_u64()? as u32;
278 let d = obj.get("z")?.as_u64()? as u32;
279 Some((w, h, d))
280 }
281}