oats_framework/
systems.rs1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::sync::RwLock;
6use crate::{Result, Object, OatsError};
7use crate::actions::ActionResult;
8
9pub type SystemId = uuid::Uuid;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
14pub enum Priority {
15 Low = 1,
16 Normal = 2,
17 High = 3,
18 Critical = 4,
19}
20
21impl Default for Priority {
22 fn default() -> Self {
23 Self::Normal
24 }
25}
26
27#[async_trait]
29pub trait System: Send + Sync {
30 fn name(&self) -> &str;
32
33 fn description(&self) -> &str;
35
36 async fn initialize(&mut self) -> Result<()> {
38 Ok(())
39 }
40
41 async fn shutdown(&mut self) -> Result<()> {
43 Ok(())
44 }
45
46 async fn process(&mut self, objects: Vec<Object>, priority: Priority) -> Result<Vec<ActionResult>>;
48
49 fn priority(&self) -> Priority {
51 Priority::Normal
52 }
53
54 fn is_ready(&self) -> bool {
56 true
57 }
58
59 fn get_stats(&self) -> SystemStats {
61 SystemStats::default()
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Default)]
67pub struct SystemStats {
68 pub objects_processed: u64,
70 pub actions_executed: u64,
72 pub errors: u64,
74 pub total_processing_time_ms: u64,
76 pub last_processed: Option<chrono::DateTime<chrono::Utc>>,
78 pub avg_processing_time_ms: f64,
80 pub peak_processing_time_ms: u64,
82}
83
84impl SystemStats {
85 pub fn update_processing_time(&mut self, processing_time_ms: u64) {
87 self.total_processing_time_ms += processing_time_ms;
88 self.peak_processing_time_ms = self.peak_processing_time_ms.max(processing_time_ms);
89
90 if self.objects_processed > 0 {
91 self.avg_processing_time_ms = self.total_processing_time_ms as f64 / self.objects_processed as f64;
92 }
93 }
94
95 pub fn throughput_objects_per_second(&self) -> f64 {
97 if self.total_processing_time_ms > 0 {
98 (self.objects_processed as f64 * 1000.0) / self.total_processing_time_ms as f64
99 } else {
100 0.0
101 }
102 }
103
104 pub fn reset(&mut self) {
106 self.objects_processed = 0;
107 self.actions_executed = 0;
108 self.errors = 0;
109 self.total_processing_time_ms = 0;
110 self.avg_processing_time_ms = 0.0;
111 self.peak_processing_time_ms = 0;
112 self.last_processed = None;
113 }
114
115 pub fn error_rate(&self) -> f64 {
117 let total = self.objects_processed + self.actions_executed;
118 if total > 0 {
119 (self.errors as f64 / total as f64) * 100.0
120 } else {
121 0.0
122 }
123 }
124}
125
126pub struct SystemManager {
128 systems: HashMap<String, Box<dyn System>>,
129 object_registry: Arc<RwLock<HashMap<String, Object>>>,
130}
131
132impl SystemManager {
133 pub fn new() -> Self {
135 Self {
136 systems: HashMap::new(),
137 object_registry: Arc::new(RwLock::new(HashMap::with_capacity(100))),
138 }
139 }
140
141 pub fn with_capacity(expected_objects: usize) -> Self {
143 Self {
144 systems: HashMap::new(),
145 object_registry: Arc::new(RwLock::new(HashMap::with_capacity(expected_objects))),
146 }
147 }
148
149 pub fn add_system(&mut self, system: Box<dyn System>) {
151 let name = system.name().to_string();
152 self.systems.insert(name, system);
153 }
154
155 pub fn remove_system(&mut self, name: &str) -> Option<Box<dyn System>> {
157 self.systems.remove(name)
158 }
159
160 pub fn get_system(&self, name: &str) -> Option<&Box<dyn System>> {
162 self.systems.get(name)
163 }
164
165 pub fn systems(&self) -> &HashMap<String, Box<dyn System>> {
167 &self.systems
168 }
169
170 pub fn system_count(&self) -> usize {
172 self.systems.len()
173 }
174
175 pub async fn register_object(&self, object: Object) {
177 let mut registry = self.object_registry.write().await;
178 registry.insert(object.id.to_string(), object);
179 }
180
181 pub async fn get_object(&self, id: &str) -> Option<Object> {
183 let registry = self.object_registry.read().await;
184 registry.get(id).cloned()
185 }
186
187 pub async fn get_all_objects(&self) -> Vec<Object> {
189 let registry = self.object_registry.read().await;
190 registry.values().cloned().collect()
191 }
192
193 pub async fn object_count(&self) -> usize {
195 let registry = self.object_registry.read().await;
196 registry.len()
197 }
198
199 pub async fn clear_objects(&self) {
201 let mut registry = self.object_registry.write().await;
202 registry.clear();
203 }
204
205 pub async fn reserve_objects(&self, additional: usize) {
207 let mut registry = self.object_registry.write().await;
208 registry.reserve(additional);
209 }
210
211 pub async fn process_all(&mut self, priority: Priority) -> Result<Vec<ActionResult>> {
213 let objects = self.get_all_objects().await;
214 let mut all_results = Vec::new();
215
216 let mut system_names: Vec<_> = self.systems.keys().cloned().collect();
218 system_names.sort_by(|a, b| {
219 let a_priority = self.systems.get(a).map(|s| s.priority()).unwrap_or(Priority::Normal);
220 let b_priority = self.systems.get(b).map(|s| s.priority()).unwrap_or(Priority::Normal);
221 b_priority.cmp(&a_priority)
222 });
223
224 for system_name in system_names {
225 if let Some(system) = self.systems.get_mut(&system_name) {
226 if system.is_ready() {
227 match system.process(objects.clone(), priority).await {
228 Ok(results) => all_results.extend(results),
229 Err(e) => {
230 let error_result = ActionResult::failure(format!("System error: {}", e));
231 all_results.push(error_result);
232 }
233 }
234 }
235 }
236 }
237
238 Ok(all_results)
239 }
240
241 pub async fn process_with_system(
243 &mut self,
244 system_name: &str,
245 objects: Vec<Object>,
246 priority: Priority,
247 ) -> Result<Vec<ActionResult>> {
248 let system = self
249 .systems
250 .get_mut(system_name)
251 .ok_or_else(|| OatsError::system_error(format!("System '{}' not found", system_name)))?;
252
253 if !system.is_ready() {
254 return Err(OatsError::system_error("System is not ready"));
255 }
256
257 system.process(objects, priority).await
258 }
259
260 pub async fn initialize_all(&mut self) -> Result<()> {
262 for (name, system) in &mut self.systems {
263 if let Err(e) = system.initialize().await {
264 return Err(OatsError::system_error(format!(
265 "Failed to initialize system '{}': {}",
266 name, e
267 )));
268 }
269 }
270 Ok(())
271 }
272
273 pub async fn shutdown_all(&mut self) -> Result<()> {
275 for (name, system) in &mut self.systems {
276 if let Err(e) = system.shutdown().await {
277 return Err(OatsError::system_error(format!(
278 "Failed to shutdown system '{}': {}",
279 name, e
280 )));
281 }
282 }
283 Ok(())
284 }
285
286 pub fn get_all_stats(&self) -> HashMap<String, SystemStats> {
288 self.systems
289 .iter()
290 .map(|(name, system)| (name.clone(), system.get_stats()))
291 .collect()
292 }
293}
294
295impl Default for SystemManager {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_priority_ordering() {
307 assert!(Priority::Critical > Priority::High);
308 assert!(Priority::High > Priority::Normal);
309 assert!(Priority::Normal > Priority::Low);
310 }
311
312 #[test]
313 fn test_system_stats() {
314 let stats = SystemStats::default();
315 assert_eq!(stats.objects_processed, 0);
316 assert_eq!(stats.actions_executed, 0);
317 assert_eq!(stats.errors, 0);
318 }
319}