Skip to main content

peat_protocol/command/
storage_trait.rs

1//! Storage abstraction for hierarchical commands
2//!
3//! This module defines the backend-agnostic storage interface for command
4//! dissemination, allowing different CRDT backends (Ditto, Automerge/Iroh) to be
5//! used interchangeably.
6
7use crate::Result;
8use async_trait::async_trait;
9use peat_schema::command::v1::{CommandAcknowledgment, CommandStatus, HierarchicalCommand};
10
11/// Backend-agnostic storage interface for hierarchical commands
12///
13/// This trait abstracts over different CRDT storage backends (Ditto, Automerge/Iroh)
14/// and provides the core operations needed for command dissemination.
15///
16/// # Design Principles
17///
18/// 1. **Publish-Once Pattern**: Each command is published once to the collection
19/// 2. **Acknowledge-Many Pattern**: Multiple nodes acknowledge a single command
20/// 3. **Backend Flexibility**: Implementations handle CRDT semantics differently
21///
22/// # Implementation Notes
23///
24/// - **Ditto**: Uses DQL INSERT for commands, observer-based subscription for reception
25/// - **Automerge/Iroh**: Uses CRDT operations on Automerge documents
26/// - Both must support observer patterns for real-time command reception
27#[async_trait]
28pub trait CommandStorage: Send + Sync {
29    // ========================================================================
30    // Command Operations
31    // ========================================================================
32
33    /// Publish a command to the storage backend
34    ///
35    /// # Arguments
36    ///
37    /// * `command` - The hierarchical command to publish
38    ///
39    /// # Returns
40    ///
41    /// Document ID on success
42    ///
43    /// # Errors
44    ///
45    /// Returns error if publish fails (network error, validation failure)
46    async fn publish_command(&self, command: &HierarchicalCommand) -> Result<String>;
47
48    /// Retrieve a command by ID
49    ///
50    /// # Returns
51    ///
52    /// Some(HierarchicalCommand) if found, None if not found
53    async fn get_command(&self, command_id: &str) -> Result<Option<HierarchicalCommand>>;
54
55    /// Query commands by target
56    ///
57    /// Returns all commands targeting the specified node/squad/platoon
58    async fn query_commands_by_target(&self, target_id: &str) -> Result<Vec<HierarchicalCommand>>;
59
60    /// Delete a command (when expired or completed)
61    async fn delete_command(&self, command_id: &str) -> Result<()>;
62
63    // ========================================================================
64    // Acknowledgment Operations
65    // ========================================================================
66
67    /// Publish an acknowledgment for a command
68    ///
69    /// # Arguments
70    ///
71    /// * `ack` - The command acknowledgment to publish
72    ///
73    /// # Returns
74    ///
75    /// Document ID on success
76    async fn publish_acknowledgment(&self, ack: &CommandAcknowledgment) -> Result<String>;
77
78    /// Get all acknowledgments for a command
79    ///
80    /// # Returns
81    ///
82    /// Vector of acknowledgments for the specified command
83    async fn get_acknowledgments(&self, command_id: &str) -> Result<Vec<CommandAcknowledgment>>;
84
85    // ========================================================================
86    // Status Tracking Operations
87    // ========================================================================
88
89    /// Update command status
90    ///
91    /// # Arguments
92    ///
93    /// * `status` - The updated command status
94    async fn update_command_status(&self, status: &CommandStatus) -> Result<()>;
95
96    /// Get command status
97    ///
98    /// # Returns
99    ///
100    /// Some(CommandStatus) if found, None if not found
101    async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>>;
102
103    // ========================================================================
104    // Observer Pattern (for real-time command reception)
105    // ========================================================================
106
107    /// Register a callback for new commands targeting this node
108    ///
109    /// # Arguments
110    ///
111    /// * `node_id` - The node ID to filter commands for
112    /// * `callback` - Async callback invoked when new commands arrive
113    ///
114    /// # Returns
115    ///
116    /// Observer handle (implementation-specific)
117    ///
118    /// # Note
119    ///
120    /// This is the critical method for real-time command reception.
121    /// Implementations should use native observer patterns (Ditto observers, Automerge subscriptions).
122    async fn observe_commands(
123        &self,
124        node_id: &str,
125        callback: Box<
126            dyn Fn(
127                    HierarchicalCommand,
128                )
129                    -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
130                + Send
131                + Sync,
132        >,
133    ) -> Result<ObserverHandle>;
134
135    /// Register a callback for new acknowledgments for commands issued by this node
136    ///
137    /// # Arguments
138    ///
139    /// * `issuer_id` - The node ID that issued commands
140    /// * `callback` - Async callback invoked when new acknowledgments arrive
141    async fn observe_acknowledgments(
142        &self,
143        issuer_id: &str,
144        callback: Box<
145            dyn Fn(
146                    CommandAcknowledgment,
147                )
148                    -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
149                + Send
150                + Sync,
151        >,
152    ) -> Result<ObserverHandle>;
153}
154
155/// Handle for an active observer subscription
156///
157/// Dropping this handle should cancel the observation.
158pub struct ObserverHandle {
159    /// Implementation-specific handle (`Arc<dyn Any>` for type erasure)
160    inner: std::sync::Arc<dyn std::any::Any + Send + Sync>,
161}
162
163impl ObserverHandle {
164    /// Create a new observer handle
165    pub fn new<T: std::any::Any + Send + Sync>(handle: T) -> Self {
166        Self {
167            inner: std::sync::Arc::new(handle),
168        }
169    }
170
171    /// Get the inner handle (for backend-specific operations)
172    pub fn inner(&self) -> &std::sync::Arc<dyn std::any::Any + Send + Sync> {
173        &self.inner
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    #[test]
180    fn test_observer_handle_creation() {
181        // Observer handle creation is tested in integration tests
182        // since it requires backend initialization
183    }
184}