zoey_core/
ipo.rs

1//! IPO Pattern - Input, Process, Output
2//!
3//! Structured event processing pattern for agent interactions
4
5use crate::types::*;
6use crate::{ZoeyError, Result};
7use std::sync::Arc;
8use tracing::{debug, info};
9
10/// Input stage - receives and validates events
11#[derive(Debug, Clone)]
12pub struct Input {
13    /// Event type
14    pub event_type: EventType,
15
16    /// Raw event data
17    pub event_data: EventPayload,
18
19    /// Input timestamp
20    pub timestamp: i64,
21
22    /// Validation result
23    pub validated: bool,
24
25    /// Compliance check result
26    pub compliance_passed: bool,
27}
28
29/// Process stage - transforms input into actionable items
30#[derive(Debug, Clone)]
31pub struct Process {
32    /// Input that was processed
33    pub input_id: uuid::Uuid,
34
35    /// Planned actions
36    pub planned_actions: Vec<String>,
37
38    /// State composed from providers
39    pub state_hash: String,
40
41    /// Processing decisions
42    pub decisions: Vec<ProcessDecision>,
43
44    /// Risk assessment
45    pub risk_level: String,
46}
47
48/// Processing decision
49#[derive(Debug, Clone)]
50pub struct ProcessDecision {
51    /// Decision type
52    pub decision_type: String,
53
54    /// Reasoning
55    pub reasoning: String,
56
57    /// Confidence (0.0 - 1.0)
58    pub confidence: f32,
59}
60
61/// Output stage - generates and validates responses
62#[derive(Debug, Clone)]
63pub struct Output {
64    /// Process that generated this output
65    pub process_id: uuid::Uuid,
66
67    /// Generated responses
68    pub responses: Vec<Memory>,
69
70    /// PII redactions applied
71    pub pii_redacted: Vec<String>,
72
73    /// Compliance validated
74    pub compliance_validated: bool,
75
76    /// Output approved
77    pub approved: bool,
78}
79
80/// IPO Pipeline - Input => Process => Output
81pub struct IPOPipeline {
82    /// Whether to enforce strict compliance
83    strict_mode: bool,
84
85    /// Whether to use local LLM only
86    local_only: bool,
87}
88
89impl IPOPipeline {
90    /// Create a new IPO pipeline
91    pub fn new(strict_mode: bool, local_only: bool) -> Self {
92        Self {
93            strict_mode,
94            local_only,
95        }
96    }
97
98    /// Check if pipeline is in local-only mode
99    pub fn is_local_only(&self) -> bool {
100        self.local_only
101    }
102
103    /// Check if pipeline is in strict mode
104    pub fn is_strict_mode(&self) -> bool {
105        self.strict_mode
106    }
107
108    /// Process an event through the IPO pipeline
109    pub async fn process_event(
110        &self,
111        event_type: EventType,
112        event_data: EventPayload,
113        runtime: Arc<dyn std::any::Any + Send + Sync>,
114    ) -> Result<Output> {
115        info!(
116            "IPO Pipeline: Processing {:?} event (strict_mode={}, local_only={})",
117            event_type, self.strict_mode, self.local_only
118        );
119
120        // === INPUT STAGE ===
121        debug!("IPO: Input stage - validating event");
122        let input = self.input_stage(event_type, event_data).await?;
123
124        if !input.validated {
125            return Err(ZoeyError::validation("Input validation failed"));
126        }
127
128        if self.strict_mode && !input.compliance_passed {
129            return Err(ZoeyError::validation("Input failed compliance check"));
130        }
131
132        // === PROCESS STAGE ===
133        debug!("IPO: Process stage - planning and execution");
134        let process = self.process_stage(&input, runtime.clone()).await?;
135
136        // === OUTPUT STAGE ===
137        debug!("IPO: Output stage - generating and validating output");
138        let output = self.output_stage(&process, runtime).await?;
139
140        if self.strict_mode && !output.approved {
141            return Err(ZoeyError::validation("Output failed approval"));
142        }
143
144        info!(
145            "IPO Pipeline: Complete - {} responses generated",
146            output.responses.len()
147        );
148        Ok(output)
149    }
150
151    /// Input stage - validate and check compliance
152    async fn input_stage(&self, event_type: EventType, event_data: EventPayload) -> Result<Input> {
153        let timestamp = chrono::Utc::now().timestamp();
154
155        // Validate input
156        let validated = true; // Would perform actual validation
157
158        // Check compliance (placeholder)
159        let compliance_passed = true; // Would check against judgment plugin
160
161        Ok(Input {
162            event_type,
163            event_data,
164            timestamp,
165            validated,
166            compliance_passed,
167        })
168    }
169
170    /// Process stage - plan and prepare
171    async fn process_stage(
172        &self,
173        _input: &Input,
174        _runtime: Arc<dyn std::any::Any + Send + Sync>,
175    ) -> Result<Process> {
176        let input_id = uuid::Uuid::new_v4();
177
178        // Use reaction planner functor to determine actions
179        let planned_actions = vec!["REPLY".to_string()]; // Would use ReactionPlannerFunctor
180
181        // Compose state (would use actual runtime)
182        let state_hash = "state_hash_placeholder".to_string();
183
184        // Make processing decisions
185        let decisions = vec![ProcessDecision {
186            decision_type: "RESPOND".to_string(),
187            reasoning: "User asked a question".to_string(),
188            confidence: 0.9,
189        }];
190
191        // Assess risk
192        let risk_level = "LOW".to_string();
193
194        Ok(Process {
195            input_id,
196            planned_actions,
197            state_hash,
198            decisions,
199            risk_level,
200        })
201    }
202
203    /// Output stage - generate and validate
204    async fn output_stage(
205        &self,
206        _process: &Process,
207        _runtime: Arc<dyn std::any::Any + Send + Sync>,
208    ) -> Result<Output> {
209        let process_id = uuid::Uuid::new_v4();
210
211        // Generate responses (would use OutputPlannerFunctor + LLM)
212        let responses = vec![]; // Placeholder
213
214        // Scan for PII and redact (would use judgment plugin)
215        let pii_redacted = vec![];
216
217        // Validate compliance
218        let compliance_validated = true;
219
220        // Approve output if compliant
221        let approved = compliance_validated && (!self.strict_mode || pii_redacted.is_empty());
222
223        Ok(Output {
224            process_id,
225            responses,
226            pii_redacted,
227            compliance_validated,
228            approved,
229        })
230    }
231}
232
233impl Default for IPOPipeline {
234    fn default() -> Self {
235        Self::new(false, false)
236    }
237}
238
239/// Government-compliant IPO pipeline
240pub fn create_government_pipeline() -> IPOPipeline {
241    IPOPipeline::new(true, true) // Strict mode + local only
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[tokio::test]
249    async fn test_ipo_pipeline() {
250        let _pipeline = IPOPipeline::default();
251
252        let _event_type = EventType::MessageReceived;
253        let _event_data = EventPayload::Generic(std::collections::HashMap::new());
254
255        // This would fail without actual runtime, but tests the structure
256        // let result = pipeline.process_event(event_type, event_data, Arc::new(())).await;
257    }
258
259    #[test]
260    fn test_government_pipeline() {
261        let pipeline = create_government_pipeline();
262        assert!(pipeline.strict_mode);
263        assert!(pipeline.local_only);
264    }
265
266    #[tokio::test]
267    async fn test_input_stage() {
268        let pipeline = IPOPipeline::default();
269        let event_type = EventType::MessageReceived;
270        let event_data = EventPayload::Generic(std::collections::HashMap::new());
271
272        let input = pipeline.input_stage(event_type, event_data).await.unwrap();
273        assert!(input.validated);
274    }
275}