duroxide/providers/
management.rs

1//! Management and observability provider interface.
2//!
3//! Separate from the core Provider trait, this interface provides
4//! administrative and debugging capabilities.
5
6use super::ProviderError;
7use crate::Event;
8
9/// Management provider for observability and administrative operations.
10///
11/// This trait is separate from `Provider` to:
12/// - Separate hot-path (runtime) from cold-path (admin) operations
13/// - Allow different implementations (e.g., read replicas, analytics DBs)
14/// - Enable extension without breaking the core Provider interface
15///
16/// # Implementation
17///
18/// Providers can implement this alongside `Provider`:
19///
20/// ```ignore
21/// impl Provider for SqliteProvider { /* runtime ops */ }
22/// impl ManagementProvider for SqliteProvider { /* admin ops */ }
23/// ```
24///
25/// # Usage
26///
27/// ```ignore
28/// let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
29/// let mgmt: Arc<dyn ManagementProvider> = store.clone();
30///
31/// // List all instances
32/// let instances = mgmt.list_instances().await?;
33///
34/// // Get execution details
35/// let executions = mgmt.list_executions("order-123").await?;
36/// let history = mgmt.read_execution("order-123", 1).await?;
37/// ```
38#[async_trait::async_trait]
39pub trait ManagementProvider: Send + Sync {
40    // ===== Instance Discovery =====
41
42    /// List all known instance IDs.
43    ///
44    /// # Returns
45    ///
46    /// Vector of instance IDs, typically sorted by creation time (newest first).
47    ///
48    /// # Use Cases
49    ///
50    /// - Admin dashboards showing all workflows
51    /// - Bulk operations across instances
52    /// - Testing (verify instance creation)
53    ///
54    /// # Implementation Example
55    ///
56    /// ```ignore
57    /// async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
58    ///     SELECT instance_id FROM instances ORDER BY created_at DESC
59    /// }
60    /// ```
61    ///
62    /// # Default
63    ///
64    /// Returns empty Vec if not supported.
65    async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
66        Ok(Vec::new())
67    }
68
69    /// List instances matching a status filter.
70    ///
71    /// # Parameters
72    ///
73    /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
74    ///
75    /// # Returns
76    ///
77    /// Vector of instance IDs with the specified status.
78    ///
79    /// # Implementation Example
80    ///
81    /// ```ignore
82    /// async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
83    ///     SELECT i.instance_id FROM instances i
84    ///     JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
85    ///     WHERE e.status = ?
86    ///     ORDER BY i.created_at DESC
87    /// }
88    /// ```
89    ///
90    /// # Default
91    ///
92    /// Returns empty Vec if not supported.
93    async fn list_instances_by_status(&self, _status: &str) -> Result<Vec<String>, ProviderError> {
94        Ok(Vec::new())
95    }
96
97    // ===== Execution Inspection =====
98
99    /// List all execution IDs for an instance.
100    ///
101    /// # Returns
102    ///
103    /// Vector of execution IDs in ascending order: \[1\], \[1, 2\], \[1, 2, 3\], etc.
104    ///
105    /// # Multi-Execution Context
106    ///
107    /// When an orchestration uses ContinueAsNew, multiple executions exist:
108    /// - Execution 1: Initial run, ends with OrchestrationContinuedAsNew
109    /// - Execution 2: Continuation, may end with Completed or another ContinueAsNew
110    /// - etc.
111    ///
112    /// # Use Cases
113    ///
114    /// - Verify ContinueAsNew created multiple executions
115    /// - Debug execution progression
116    /// - Audit trail inspection
117    ///
118    /// # Implementation Example
119    ///
120    /// ```ignore
121    /// async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
122    ///     SELECT execution_id FROM executions
123    ///     WHERE instance_id = ?
124    ///     ORDER BY execution_id ASC
125    /// }
126    /// ```
127    ///
128    /// # Default
129    ///
130    /// Returns \[1\] if instance exists, empty Vec otherwise.
131    async fn list_executions(&self, _instance: &str) -> Result<Vec<u64>, ProviderError> {
132        // Default: assume single execution
133        Ok(vec![1])
134    }
135
136    /// Read history for a specific execution.
137    ///
138    /// # Parameters
139    ///
140    /// * `instance` - Instance ID
141    /// * `execution_id` - Specific execution to read (1, 2, 3, ...)
142    ///
143    /// # Returns
144    ///
145    /// Events for the specified execution, ordered by event_id.
146    ///
147    /// # Use Cases
148    ///
149    /// - Debug specific execution in multi-execution instance
150    /// - Inspect what happened in execution 1 after ContinueAsNew created execution 2
151    /// - Audit trail for specific execution
152    ///
153    /// # Difference from Provider.read()
154    ///
155    /// - `Provider.read(instance)` → Returns LATEST execution's history
156    /// - `ManagementProvider.read_execution(instance, exec_id)` → Returns SPECIFIC execution's history
157    ///
158    /// # Implementation Example
159    ///
160    /// ```ignore
161    /// async fn read_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, ProviderError> {
162    ///     SELECT event_data FROM history
163    ///     WHERE instance_id = ? AND execution_id = ?
164    ///     ORDER BY event_id ASC
165    /// }
166    /// ```
167    ///
168    /// # Default
169    ///
170    /// Returns error indicating not supported.
171    async fn read_execution(&self, instance: &str, _execution_id: u64) -> Result<Vec<Event>, ProviderError> {
172        Err(ProviderError::permanent(
173            "read_execution",
174            format!("not supported for instance: {instance}"),
175        ))
176    }
177
178    /// Get the latest (current) execution ID for an instance.
179    ///
180    /// # Returns
181    ///
182    /// * `Ok(execution_id)` - The highest execution ID for this instance
183    /// * `Err(msg)` - Instance not found or error
184    ///
185    /// # Use Cases
186    ///
187    /// - Determine how many times an instance has continued
188    /// - Check current execution number
189    /// - Debugging multi-execution workflows
190    ///
191    /// # Implementation Example
192    ///
193    /// ```ignore
194    /// async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
195    ///     SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?
196    /// }
197    /// ```
198    ///
199    /// # Default
200    ///
201    /// Returns 1 (assumes single execution).
202    async fn latest_execution_id(&self, _instance: &str) -> Result<u64, ProviderError> {
203        Ok(1)
204    }
205
206    // ===== Instance Metadata =====
207
208    /// Get comprehensive information about an instance.
209    ///
210    /// # Returns
211    ///
212    /// Metadata about the instance including name, version, status, timestamps.
213    ///
214    /// # Use Cases
215    ///
216    /// - Admin dashboard showing instance details
217    /// - CLI tools displaying instance info
218    /// - Monitoring systems
219    ///
220    /// # Implementation Example
221    ///
222    /// ```ignore
223    /// async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
224    ///     SELECT i.orchestration_name, i.orchestration_version, i.current_execution_id,
225    ///            e.status, e.output, i.created_at, e.completed_at
226    ///     FROM instances i
227    ///     LEFT JOIN executions e ON i.instance_id = e.instance_id
228    ///         AND i.current_execution_id = e.execution_id
229    ///     WHERE i.instance_id = ?
230    /// }
231    /// ```
232    ///
233    /// # Default
234    ///
235    /// Returns error indicating not supported.
236    async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
237        Err(ProviderError::permanent(
238            "get_instance_info",
239            format!("not supported for instance: {instance}"),
240        ))
241    }
242
243    /// Get detailed metadata for a specific execution.
244    ///
245    /// # Returns
246    ///
247    /// Information about a specific execution including status, output, event count, timestamps.
248    ///
249    /// # Use Cases
250    ///
251    /// - Inspect individual executions in ContinueAsNew workflows
252    /// - Debug execution-specific issues
253    /// - Performance analysis (event count, duration)
254    ///
255    /// # Implementation Example
256    ///
257    /// ```ignore
258    /// async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ProviderError> {
259    ///     SELECT status, output, started_at, completed_at,
260    ///            (SELECT COUNT(*) FROM history WHERE instance_id = ? AND execution_id = ?) as event_count
261    ///     FROM executions
262    ///     WHERE instance_id = ? AND execution_id = ?
263    /// }
264    /// ```
265    ///
266    /// # Default
267    ///
268    /// Returns error indicating not supported.
269    async fn get_execution_info(&self, instance: &str, _execution_id: u64) -> Result<ExecutionInfo, ProviderError> {
270        Err(ProviderError::permanent(
271            "get_execution_info",
272            format!("not supported for instance: {instance}"),
273        ))
274    }
275
276    // ===== System Metrics =====
277
278    /// Get system-wide orchestration metrics.
279    ///
280    /// # Returns
281    ///
282    /// Aggregate statistics: total instances, running count, completed count, failed count, etc.
283    ///
284    /// # Use Cases
285    ///
286    /// - Monitoring dashboards
287    /// - Health checks
288    /// - Capacity planning
289    ///
290    /// # Implementation Example
291    ///
292    /// ```text
293    /// async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
294    ///     SELECT
295    ///         COUNT(DISTINCT i.instance_id) as total_instances,
296    ///         COUNT(DISTINCT e.execution_id) as total_executions,
297    ///         SUM(CASE WHEN e.status = 'Running' THEN 1 ELSE 0 END) as running,
298    ///         SUM(CASE WHEN e.status = 'Completed' THEN 1 ELSE 0 END) as completed,
299    ///         SUM(CASE WHEN e.status = 'Failed' THEN 1 ELSE 0 END) as failed
300    ///     FROM instances i
301    ///     JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
302    /// }
303    /// ```
304    ///
305    /// # Default
306    ///
307    /// Returns default/empty metrics.
308    async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
309        Ok(SystemMetrics::default())
310    }
311
312    /// Get current queue depths.
313    ///
314    /// # Returns
315    ///
316    /// Number of unlocked messages in each queue.
317    ///
318    /// # Use Cases
319    ///
320    /// - Monitor backlog
321    /// - Capacity planning
322    /// - Performance troubleshooting
323    ///
324    /// # Implementation Example
325    ///
326    /// ```ignore
327    /// async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
328    ///     SELECT
329    ///         (SELECT COUNT(*) FROM orchestrator_queue WHERE lock_token IS NULL) as orch,
330    ///         (SELECT COUNT(*) FROM worker_queue WHERE lock_token IS NULL) as worker,
331    ///         (SELECT COUNT(*) FROM timer_queue WHERE lock_token IS NULL) as timer
332    /// }
333    /// ```
334    ///
335    /// # Default
336    ///
337    /// Returns zeros.
338    async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
339        Ok(QueueDepths::default())
340    }
341}
342
343// ===== Supporting Types =====
344
345/// Comprehensive instance metadata.
346#[derive(Debug, Clone)]
347pub struct InstanceInfo {
348    pub instance_id: String,
349    pub orchestration_name: String,
350    pub orchestration_version: String,
351    pub current_execution_id: u64,
352    pub status: String,         // "Running", "Completed", "Failed", "ContinuedAsNew"
353    pub output: Option<String>, // Terminal output or error
354    pub created_at: u64,        // Milliseconds since epoch
355    pub updated_at: u64,
356}
357
358/// Execution-specific metadata.
359#[derive(Debug, Clone)]
360pub struct ExecutionInfo {
361    pub execution_id: u64,
362    pub status: String,            // "Running", "Completed", "Failed", "ContinuedAsNew"
363    pub output: Option<String>,    // Terminal output, error, or next input
364    pub started_at: u64,           // Milliseconds since epoch
365    pub completed_at: Option<u64>, // None if still running
366    pub event_count: usize,        // Number of events in this execution
367}
368
369/// System-wide orchestration metrics.
370#[derive(Debug, Clone, Default)]
371pub struct SystemMetrics {
372    pub total_instances: u64,
373    pub total_executions: u64,
374    pub running_instances: u64,
375    pub completed_instances: u64,
376    pub failed_instances: u64,
377    pub total_events: u64,
378}
379
380/// Queue depth information.
381#[derive(Debug, Clone, Default)]
382pub struct QueueDepths {
383    pub orchestrator_queue: usize, // Unlocked orchestrator messages
384    pub worker_queue: usize,       // Unlocked worker messages
385    pub timer_queue: usize,        // Unlocked timer messages
386}