peat_protocol/command/storage_trait.rs
1//! Storage abstraction for hierarchical commands
2//!
3//! This module defines the backend-agnostic storage interface for command
4//! dissemination, allowing different CRDT backends (Ditto, Automerge/Iroh) to be
5//! used interchangeably.
6
7use crate::Result;
8use async_trait::async_trait;
9use peat_schema::command::v1::{CommandAcknowledgment, CommandStatus, HierarchicalCommand};
10
11/// Backend-agnostic storage interface for hierarchical commands
12///
13/// This trait abstracts over different CRDT storage backends (Ditto, Automerge/Iroh)
14/// and provides the core operations needed for command dissemination.
15///
16/// # Design Principles
17///
18/// 1. **Publish-Once Pattern**: Each command is published once to the collection
19/// 2. **Acknowledge-Many Pattern**: Multiple nodes acknowledge a single command
20/// 3. **Backend Flexibility**: Implementations handle CRDT semantics differently
21///
22/// # Implementation Notes
23///
24/// - **Ditto**: Uses DQL INSERT for commands, observer-based subscription for reception
25/// - **Automerge/Iroh**: Uses CRDT operations on Automerge documents
26/// - Both must support observer patterns for real-time command reception
27#[async_trait]
28pub trait CommandStorage: Send + Sync {
29 // ========================================================================
30 // Command Operations
31 // ========================================================================
32
33 /// Publish a command to the storage backend
34 ///
35 /// # Arguments
36 ///
37 /// * `command` - The hierarchical command to publish
38 ///
39 /// # Returns
40 ///
41 /// Document ID on success
42 ///
43 /// # Errors
44 ///
45 /// Returns error if publish fails (network error, validation failure)
46 async fn publish_command(&self, command: &HierarchicalCommand) -> Result<String>;
47
48 /// Retrieve a command by ID
49 ///
50 /// # Returns
51 ///
52 /// Some(HierarchicalCommand) if found, None if not found
53 async fn get_command(&self, command_id: &str) -> Result<Option<HierarchicalCommand>>;
54
55 /// Query commands by target
56 ///
57 /// Returns all commands targeting the specified node/squad/platoon
58 async fn query_commands_by_target(&self, target_id: &str) -> Result<Vec<HierarchicalCommand>>;
59
60 /// Delete a command (when expired or completed)
61 async fn delete_command(&self, command_id: &str) -> Result<()>;
62
63 // ========================================================================
64 // Acknowledgment Operations
65 // ========================================================================
66
67 /// Publish an acknowledgment for a command
68 ///
69 /// # Arguments
70 ///
71 /// * `ack` - The command acknowledgment to publish
72 ///
73 /// # Returns
74 ///
75 /// Document ID on success
76 async fn publish_acknowledgment(&self, ack: &CommandAcknowledgment) -> Result<String>;
77
78 /// Get all acknowledgments for a command
79 ///
80 /// # Returns
81 ///
82 /// Vector of acknowledgments for the specified command
83 async fn get_acknowledgments(&self, command_id: &str) -> Result<Vec<CommandAcknowledgment>>;
84
85 // ========================================================================
86 // Status Tracking Operations
87 // ========================================================================
88
89 /// Update command status
90 ///
91 /// # Arguments
92 ///
93 /// * `status` - The updated command status
94 async fn update_command_status(&self, status: &CommandStatus) -> Result<()>;
95
96 /// Get command status
97 ///
98 /// # Returns
99 ///
100 /// Some(CommandStatus) if found, None if not found
101 async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>>;
102
103 // ========================================================================
104 // Observer Pattern (for real-time command reception)
105 // ========================================================================
106
107 /// Register a callback for new commands targeting this node
108 ///
109 /// # Arguments
110 ///
111 /// * `node_id` - The node ID to filter commands for
112 /// * `callback` - Async callback invoked when new commands arrive
113 ///
114 /// # Returns
115 ///
116 /// Observer handle (implementation-specific)
117 ///
118 /// # Note
119 ///
120 /// This is the critical method for real-time command reception.
121 /// Implementations should use native observer patterns (Ditto observers, Automerge subscriptions).
122 async fn observe_commands(
123 &self,
124 node_id: &str,
125 callback: Box<
126 dyn Fn(
127 HierarchicalCommand,
128 )
129 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
130 + Send
131 + Sync,
132 >,
133 ) -> Result<ObserverHandle>;
134
135 /// Register a callback for new acknowledgments for commands issued by this node
136 ///
137 /// # Arguments
138 ///
139 /// * `issuer_id` - The node ID that issued commands
140 /// * `callback` - Async callback invoked when new acknowledgments arrive
141 async fn observe_acknowledgments(
142 &self,
143 issuer_id: &str,
144 callback: Box<
145 dyn Fn(
146 CommandAcknowledgment,
147 )
148 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
149 + Send
150 + Sync,
151 >,
152 ) -> Result<ObserverHandle>;
153}
154
155/// Handle for an active observer subscription
156///
157/// Dropping this handle should cancel the observation.
158pub struct ObserverHandle {
159 /// Implementation-specific handle (`Arc<dyn Any>` for type erasure)
160 inner: std::sync::Arc<dyn std::any::Any + Send + Sync>,
161}
162
163impl ObserverHandle {
164 /// Create a new observer handle
165 pub fn new<T: std::any::Any + Send + Sync>(handle: T) -> Self {
166 Self {
167 inner: std::sync::Arc::new(handle),
168 }
169 }
170
171 /// Get the inner handle (for backend-specific operations)
172 pub fn inner(&self) -> &std::sync::Arc<dyn std::any::Any + Send + Sync> {
173 &self.inner
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 #[test]
180 fn test_observer_handle_creation() {
181 // Observer handle creation is tested in integration tests
182 // since it requires backend initialization
183 }
184}