duroxide/providers/management.rs
1//! Management and observability provider interface.
2//!
3//! Separate from the core Provider trait, this interface provides
4//! administrative and debugging capabilities.
5
6use super::ProviderError;
7use crate::Event;
8
9/// Management provider for observability and administrative operations.
10///
11/// This trait is separate from `Provider` to:
12/// - Separate hot-path (runtime) from cold-path (admin) operations
13/// - Allow different implementations (e.g., read replicas, analytics DBs)
14/// - Enable extension without breaking the core Provider interface
15///
16/// # Implementation
17///
18/// Providers can implement this alongside `Provider`:
19///
20/// ```ignore
21/// impl Provider for SqliteProvider { /* runtime ops */ }
22/// impl ManagementProvider for SqliteProvider { /* admin ops */ }
23/// ```
24///
25/// # Usage
26///
27/// ```ignore
28/// let store = Arc::new(SqliteProvider::new("sqlite:./data.db").await?);
29/// let mgmt: Arc<dyn ManagementProvider> = store.clone();
30///
31/// // List all instances
32/// let instances = mgmt.list_instances().await?;
33///
34/// // Get execution details
35/// let executions = mgmt.list_executions("order-123").await?;
36/// let history = mgmt.read_execution("order-123", 1).await?;
37/// ```
38#[async_trait::async_trait]
39pub trait ManagementProvider: Send + Sync {
40 // ===== Instance Discovery =====
41
42 /// List all known instance IDs.
43 ///
44 /// # Returns
45 ///
46 /// Vector of instance IDs, typically sorted by creation time (newest first).
47 ///
48 /// # Use Cases
49 ///
50 /// - Admin dashboards showing all workflows
51 /// - Bulk operations across instances
52 /// - Testing (verify instance creation)
53 ///
54 /// # Implementation Example
55 ///
56 /// ```ignore
57 /// async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
58 /// SELECT instance_id FROM instances ORDER BY created_at DESC
59 /// }
60 /// ```
61 ///
62 /// # Default
63 ///
64 /// Returns empty Vec if not supported.
65 async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
66 Ok(Vec::new())
67 }
68
69 /// List instances matching a status filter.
70 ///
71 /// # Parameters
72 ///
73 /// * `status` - Filter by execution status: "Running", "Completed", "Failed", "ContinuedAsNew"
74 ///
75 /// # Returns
76 ///
77 /// Vector of instance IDs with the specified status.
78 ///
79 /// # Implementation Example
80 ///
81 /// ```ignore
82 /// async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
83 /// SELECT i.instance_id FROM instances i
84 /// JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
85 /// WHERE e.status = ?
86 /// ORDER BY i.created_at DESC
87 /// }
88 /// ```
89 ///
90 /// # Default
91 ///
92 /// Returns empty Vec if not supported.
93 async fn list_instances_by_status(&self, _status: &str) -> Result<Vec<String>, ProviderError> {
94 Ok(Vec::new())
95 }
96
97 // ===== Execution Inspection =====
98
99 /// List all execution IDs for an instance.
100 ///
101 /// # Returns
102 ///
103 /// Vector of execution IDs in ascending order: \[1\], \[1, 2\], \[1, 2, 3\], etc.
104 ///
105 /// # Multi-Execution Context
106 ///
107 /// When an orchestration uses ContinueAsNew, multiple executions exist:
108 /// - Execution 1: Initial run, ends with OrchestrationContinuedAsNew
109 /// - Execution 2: Continuation, may end with Completed or another ContinueAsNew
110 /// - etc.
111 ///
112 /// # Use Cases
113 ///
114 /// - Verify ContinueAsNew created multiple executions
115 /// - Debug execution progression
116 /// - Audit trail inspection
117 ///
118 /// # Implementation Example
119 ///
120 /// ```ignore
121 /// async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
122 /// SELECT execution_id FROM executions
123 /// WHERE instance_id = ?
124 /// ORDER BY execution_id ASC
125 /// }
126 /// ```
127 ///
128 /// # Default
129 ///
130 /// Returns \[1\] if instance exists, empty Vec otherwise.
131 async fn list_executions(&self, _instance: &str) -> Result<Vec<u64>, ProviderError> {
132 // Default: assume single execution
133 Ok(vec![1])
134 }
135
136 /// Read history for a specific execution.
137 ///
138 /// # Parameters
139 ///
140 /// * `instance` - Instance ID
141 /// * `execution_id` - Specific execution to read (1, 2, 3, ...)
142 ///
143 /// # Returns
144 ///
145 /// Events for the specified execution, ordered by event_id.
146 ///
147 /// # Use Cases
148 ///
149 /// - Debug specific execution in multi-execution instance
150 /// - Inspect what happened in execution 1 after ContinueAsNew created execution 2
151 /// - Audit trail for specific execution
152 ///
153 /// # Difference from Provider.read()
154 ///
155 /// - `Provider.read(instance)` → Returns LATEST execution's history
156 /// - `ManagementProvider.read_execution(instance, exec_id)` → Returns SPECIFIC execution's history
157 ///
158 /// # Implementation Example
159 ///
160 /// ```ignore
161 /// async fn read_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, ProviderError> {
162 /// SELECT event_data FROM history
163 /// WHERE instance_id = ? AND execution_id = ?
164 /// ORDER BY event_id ASC
165 /// }
166 /// ```
167 ///
168 /// # Default
169 ///
170 /// Returns error indicating not supported.
171 async fn read_execution(&self, instance: &str, _execution_id: u64) -> Result<Vec<Event>, ProviderError> {
172 Err(ProviderError::permanent(
173 "read_execution",
174 format!("not supported for instance: {instance}"),
175 ))
176 }
177
178 /// Get the latest (current) execution ID for an instance.
179 ///
180 /// # Returns
181 ///
182 /// * `Ok(execution_id)` - The highest execution ID for this instance
183 /// * `Err(msg)` - Instance not found or error
184 ///
185 /// # Use Cases
186 ///
187 /// - Determine how many times an instance has continued
188 /// - Check current execution number
189 /// - Debugging multi-execution workflows
190 ///
191 /// # Implementation Example
192 ///
193 /// ```ignore
194 /// async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
195 /// SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?
196 /// }
197 /// ```
198 ///
199 /// # Default
200 ///
201 /// Returns 1 (assumes single execution).
202 async fn latest_execution_id(&self, _instance: &str) -> Result<u64, ProviderError> {
203 Ok(1)
204 }
205
206 // ===== Instance Metadata =====
207
208 /// Get comprehensive information about an instance.
209 ///
210 /// # Returns
211 ///
212 /// Metadata about the instance including name, version, status, timestamps.
213 ///
214 /// # Use Cases
215 ///
216 /// - Admin dashboard showing instance details
217 /// - CLI tools displaying instance info
218 /// - Monitoring systems
219 ///
220 /// # Implementation Example
221 ///
222 /// ```ignore
223 /// async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
224 /// SELECT i.orchestration_name, i.orchestration_version, i.current_execution_id,
225 /// e.status, e.output, i.created_at, e.completed_at
226 /// FROM instances i
227 /// LEFT JOIN executions e ON i.instance_id = e.instance_id
228 /// AND i.current_execution_id = e.execution_id
229 /// WHERE i.instance_id = ?
230 /// }
231 /// ```
232 ///
233 /// # Default
234 ///
235 /// Returns error indicating not supported.
236 async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
237 Err(ProviderError::permanent(
238 "get_instance_info",
239 format!("not supported for instance: {instance}"),
240 ))
241 }
242
243 /// Get detailed metadata for a specific execution.
244 ///
245 /// # Returns
246 ///
247 /// Information about a specific execution including status, output, event count, timestamps.
248 ///
249 /// # Use Cases
250 ///
251 /// - Inspect individual executions in ContinueAsNew workflows
252 /// - Debug execution-specific issues
253 /// - Performance analysis (event count, duration)
254 ///
255 /// # Implementation Example
256 ///
257 /// ```ignore
258 /// async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ProviderError> {
259 /// SELECT status, output, started_at, completed_at,
260 /// (SELECT COUNT(*) FROM history WHERE instance_id = ? AND execution_id = ?) as event_count
261 /// FROM executions
262 /// WHERE instance_id = ? AND execution_id = ?
263 /// }
264 /// ```
265 ///
266 /// # Default
267 ///
268 /// Returns error indicating not supported.
269 async fn get_execution_info(&self, instance: &str, _execution_id: u64) -> Result<ExecutionInfo, ProviderError> {
270 Err(ProviderError::permanent(
271 "get_execution_info",
272 format!("not supported for instance: {instance}"),
273 ))
274 }
275
276 // ===== System Metrics =====
277
278 /// Get system-wide orchestration metrics.
279 ///
280 /// # Returns
281 ///
282 /// Aggregate statistics: total instances, running count, completed count, failed count, etc.
283 ///
284 /// # Use Cases
285 ///
286 /// - Monitoring dashboards
287 /// - Health checks
288 /// - Capacity planning
289 ///
290 /// # Implementation Example
291 ///
292 /// ```text
293 /// async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
294 /// SELECT
295 /// COUNT(DISTINCT i.instance_id) as total_instances,
296 /// COUNT(DISTINCT e.execution_id) as total_executions,
297 /// SUM(CASE WHEN e.status = 'Running' THEN 1 ELSE 0 END) as running,
298 /// SUM(CASE WHEN e.status = 'Completed' THEN 1 ELSE 0 END) as completed,
299 /// SUM(CASE WHEN e.status = 'Failed' THEN 1 ELSE 0 END) as failed
300 /// FROM instances i
301 /// JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
302 /// }
303 /// ```
304 ///
305 /// # Default
306 ///
307 /// Returns default/empty metrics.
308 async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
309 Ok(SystemMetrics::default())
310 }
311
312 /// Get current queue depths.
313 ///
314 /// # Returns
315 ///
316 /// Number of unlocked messages in each queue.
317 ///
318 /// # Use Cases
319 ///
320 /// - Monitor backlog
321 /// - Capacity planning
322 /// - Performance troubleshooting
323 ///
324 /// # Implementation Example
325 ///
326 /// ```ignore
327 /// async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
328 /// SELECT
329 /// (SELECT COUNT(*) FROM orchestrator_queue WHERE lock_token IS NULL) as orch,
330 /// (SELECT COUNT(*) FROM worker_queue WHERE lock_token IS NULL) as worker,
331 /// (SELECT COUNT(*) FROM timer_queue WHERE lock_token IS NULL) as timer
332 /// }
333 /// ```
334 ///
335 /// # Default
336 ///
337 /// Returns zeros.
338 async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
339 Ok(QueueDepths::default())
340 }
341}
342
343// ===== Supporting Types =====
344
345/// Comprehensive instance metadata.
346#[derive(Debug, Clone)]
347pub struct InstanceInfo {
348 pub instance_id: String,
349 pub orchestration_name: String,
350 pub orchestration_version: String,
351 pub current_execution_id: u64,
352 pub status: String, // "Running", "Completed", "Failed", "ContinuedAsNew"
353 pub output: Option<String>, // Terminal output or error
354 pub created_at: u64, // Milliseconds since epoch
355 pub updated_at: u64,
356}
357
358/// Execution-specific metadata.
359#[derive(Debug, Clone)]
360pub struct ExecutionInfo {
361 pub execution_id: u64,
362 pub status: String, // "Running", "Completed", "Failed", "ContinuedAsNew"
363 pub output: Option<String>, // Terminal output, error, or next input
364 pub started_at: u64, // Milliseconds since epoch
365 pub completed_at: Option<u64>, // None if still running
366 pub event_count: usize, // Number of events in this execution
367}
368
369/// System-wide orchestration metrics.
370#[derive(Debug, Clone, Default)]
371pub struct SystemMetrics {
372 pub total_instances: u64,
373 pub total_executions: u64,
374 pub running_instances: u64,
375 pub completed_instances: u64,
376 pub failed_instances: u64,
377 pub total_events: u64,
378}
379
380/// Queue depth information.
381#[derive(Debug, Clone, Default)]
382pub struct QueueDepths {
383 pub orchestrator_queue: usize, // Unlocked orchestrator messages
384 pub worker_queue: usize, // Unlocked worker messages
385 pub timer_queue: usize, // Unlocked timer messages
386}