Skip to main content

feagi_agent/sdk/base/
topology.rs

1//! Cortical topology access and caching.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use std::sync::RwLock;
7
8use crate::core::SdkError;
9use crate::sdk::types::CorticalID;
10
11/// Parsed cortical topology for a single cortical area.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub struct CorticalTopology {
14    /// X dimension (width) per device.
15    pub width: u32,
16    /// Y dimension (height) per device.
17    pub height: u32,
18    /// Z dimension (depth) per device.
19    pub depth: u32,
20    /// Number of channels/devices.
21    pub channels: u32,
22}
23
24/// HTTP-backed topology cache for cortical areas.
25#[derive(Debug, Clone)]
26pub struct TopologyCache {
27    host: String,
28    port: u16,
29    #[allow(dead_code)]
30    timeout: std::time::Duration,
31    cache: Arc<RwLock<HashMap<CorticalID, CorticalTopology>>>,
32    #[cfg(feature = "sdk-io")]
33    client: reqwest::Client,
34}
35
36impl TopologyCache {
37    /// Create a new topology cache for a FEAGI HTTP endpoint.
38    pub fn new(host: impl Into<String>, port: u16, timeout_s: f64) -> Result<Self, SdkError> {
39        let host = host.into();
40        let timeout = std::time::Duration::from_secs_f64(timeout_s);
41        #[cfg(feature = "sdk-io")]
42        let client = reqwest::Client::builder()
43            .timeout(timeout)
44            .build()
45            .map_err(|e| SdkError::Other(format!("TopologyCache HTTP client init failed: {e}")))?;
46        Ok(Self {
47            host,
48            port,
49            timeout,
50            cache: Arc::new(RwLock::new(HashMap::new())),
51            #[cfg(feature = "sdk-io")]
52            client,
53        })
54    }
55
56    /// Fetch and cache topology for a single cortical ID.
57    #[cfg(feature = "sdk-io")]
58    pub async fn get_topology(&self, id: &CorticalID) -> Result<CorticalTopology, SdkError> {
59        if let Ok(cache) = self.cache.read() {
60            if let Some(existing) = cache.get(id).copied() {
61                return Ok(existing);
62            }
63        }
64        let payload = self.fetch_topologies(&[id.as_base_64()]).await?;
65        let topo = Self::parse_topology_payload(id, &payload)?;
66        if let Ok(mut cache) = self.cache.write() {
67            cache.insert(*id, topo);
68        }
69        Ok(topo)
70    }
71
72    /// Fetch and cache topologies for multiple cortical IDs.
73    #[cfg(feature = "sdk-io")]
74    pub async fn get_topologies(
75        &self,
76        ids: &[CorticalID],
77    ) -> Result<Vec<CorticalTopology>, SdkError> {
78        let missing: Vec<String> = {
79            let cache = self
80                .cache
81                .read()
82                .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
83            ids.iter()
84                .filter(|id| !cache.contains_key(*id))
85                .map(|id| id.as_base_64())
86                .collect()
87        };
88
89        if !missing.is_empty() {
90            let payload = self.fetch_topologies(&missing).await?;
91            let mut cache = self
92                .cache
93                .write()
94                .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
95            for id in ids {
96                if cache.contains_key(id) {
97                    continue;
98                }
99                let topo = Self::parse_topology_payload(id, &payload)?;
100                cache.insert(*id, topo);
101            }
102        }
103
104        let cache = self
105            .cache
106            .read()
107            .map_err(|_| SdkError::Other("Topology cache lock poisoned".to_string()))?;
108        ids.iter()
109            .map(|id| {
110                cache
111                    .get(id)
112                    .copied()
113                    .ok_or_else(|| SdkError::Other(format!("Topology missing in cache: {}", id)))
114            })
115            .collect()
116    }
117
118    /// Prefetch and cache topologies for the provided cortical IDs.
119    #[cfg(feature = "sdk-io")]
120    pub async fn prefetch(&self, ids: &[CorticalID]) -> Result<(), SdkError> {
121        self.get_topologies(ids).await.map(|_| ())
122    }
123
124    /// Clear all cached topology entries.
125    pub fn clear_cache(&self) {
126        if let Ok(mut cache) = self.cache.write() {
127            cache.clear();
128        }
129    }
130
131    /// Return the number of cached cortical topologies.
132    pub fn cache_size(&self) -> usize {
133        self.cache.read().map(|c| c.len()).unwrap_or(0)
134    }
135
136    /// Parse a topology payload returned by FEAGI HTTP APIs.
137    pub fn parse_topology_payload(
138        id: &CorticalID,
139        payload: &serde_json::Value,
140    ) -> Result<CorticalTopology, SdkError> {
141        let key = id.as_base_64();
142        let entry = payload
143            .get(&key)
144            .ok_or_else(|| SdkError::Other(format!("Topology payload missing key: {key}")))?;
145        let entry_obj = entry.as_object().ok_or_else(|| {
146            SdkError::Other(format!("Topology entry is not an object for key: {key}"))
147        })?;
148
149        let (dims, channels) = Self::parse_dimensions(entry_obj).ok_or_else(|| {
150            SdkError::Other(format!("Topology dimensions missing for key: {key}"))
151        })?;
152
153        Ok(CorticalTopology {
154            width: dims.0,
155            height: dims.1,
156            depth: dims.2,
157            channels,
158        })
159    }
160
161    #[cfg(feature = "sdk-io")]
162    async fn fetch_topologies(
163        &self,
164        cortical_ids: &[String],
165    ) -> Result<serde_json::Value, SdkError> {
166        let url = format!(
167            "http://{}:{}/v1/cortical_area/multi/cortical_area_properties",
168            self.host, self.port
169        );
170        let response = self
171            .client
172            .post(url)
173            .json(&serde_json::Value::Array(
174                cortical_ids.iter().map(|id| id.clone().into()).collect(),
175            ))
176            .send()
177            .await
178            .map_err(|e| SdkError::Other(format!("Topology request failed: {e}")))?;
179
180        let response = response
181            .error_for_status()
182            .map_err(|e| SdkError::Other(format!("Topology response error: {e}")))?;
183
184        response
185            .json::<serde_json::Value>()
186            .await
187            .map_err(|e| SdkError::Other(format!("Topology response parse failed: {e}")))
188    }
189
190    fn parse_dimensions(
191        entry_obj: &serde_json::Map<String, serde_json::Value>,
192    ) -> Option<((u32, u32, u32), u32)> {
193        let properties = entry_obj
194            .get("properties")
195            .and_then(|value| value.as_object());
196
197        let per_device_dims = entry_obj
198            .get("cortical_dimensions_per_device")
199            .and_then(Self::parse_dim_array)
200            .or_else(|| {
201                properties
202                    .and_then(|props| props.get("cortical_dimensions_per_device"))
203                    .and_then(Self::parse_dim_array)
204            });
205
206        let dev_count = entry_obj
207            .get("dev_count")
208            .and_then(|value| value.as_u64())
209            .map(|value| value as u32)
210            .or_else(|| {
211                properties
212                    .and_then(|props| props.get("dev_count"))
213                    .and_then(|value| value.as_u64())
214                    .map(|value| value as u32)
215            });
216
217        let total_dims = entry_obj
218            .get("cortical_dimensions")
219            .or_else(|| entry_obj.get("dimensions"))
220            .or_else(|| {
221                properties
222                    .and_then(|props| props.get("cortical_dimensions"))
223                    .or_else(|| properties.and_then(|props| props.get("dimensions")))
224            })
225            .and_then(|dim_val| {
226                if dim_val.is_array() {
227                    Self::parse_dim_array(dim_val)
228                } else if dim_val.is_object() {
229                    Self::parse_dim_object(dim_val)
230                } else {
231                    None
232                }
233            });
234
235        if let (Some(dims), Some(channels)) = (per_device_dims, dev_count) {
236            return Some((dims, channels));
237        }
238
239        if let (Some(dims), Some(total_dims)) = (per_device_dims, total_dims) {
240            if dims.0 > 0 && total_dims.0 % dims.0 == 0 {
241                let channels = total_dims.0 / dims.0;
242                if channels > 0 {
243                    return Some((dims, channels));
244                }
245            }
246        }
247
248        if let (Some(total_dims), Some(channels)) = (total_dims, dev_count) {
249            if channels > 0 && total_dims.0 % channels == 0 {
250                let per_device = (total_dims.0 / channels, total_dims.1, total_dims.2);
251                return Some((per_device, channels));
252            }
253        }
254
255        if let Some(total_dims) = total_dims {
256            return Some((total_dims, 1));
257        }
258
259        // TODO: Support alternate topology payload shapes if API expands fields.
260        None
261    }
262
263    fn parse_dim_array(value: &serde_json::Value) -> Option<(u32, u32, u32)> {
264        let arr = value.as_array()?;
265        if arr.len() != 3 {
266            return None;
267        }
268        let w = arr[0].as_u64()? as u32;
269        let h = arr[1].as_u64()? as u32;
270        let d = arr[2].as_u64()? as u32;
271        Some((w, h, d))
272    }
273
274    fn parse_dim_object(value: &serde_json::Value) -> Option<(u32, u32, u32)> {
275        let obj = value.as_object()?;
276        let w = obj.get("x")?.as_u64()? as u32;
277        let h = obj.get("y")?.as_u64()? as u32;
278        let d = obj.get("z")?.as_u64()? as u32;
279        Some((w, h, d))
280    }
281}