Skip to main content

peat_protocol/storage/
automerge_command_storage.rs

1//! Automerge implementation of CommandStorage trait
2//!
3//! This module provides the Automerge backend implementation of the backend-agnostic
4//! CommandStorage trait, enabling hierarchical command dissemination with Automerge's CRDT engine.
5
6#[cfg(feature = "automerge-backend")]
7use crate::command::{CommandStorage, ObserverHandle};
8#[cfg(feature = "automerge-backend")]
9use crate::storage::automerge_conversion::{automerge_to_message, message_to_automerge};
10#[cfg(feature = "automerge-backend")]
11use crate::storage::automerge_store::AutomergeStore;
12#[cfg(feature = "automerge-backend")]
13use crate::Result;
14#[cfg(feature = "automerge-backend")]
15use async_trait::async_trait;
16#[cfg(feature = "automerge-backend")]
17use peat_schema::command::v1::{CommandAcknowledgment, CommandStatus, HierarchicalCommand};
18#[cfg(feature = "automerge-backend")]
19use std::sync::Arc;
20#[cfg(feature = "automerge-backend")]
21use std::time::{SystemTime, UNIX_EPOCH};
22#[cfg(feature = "automerge-backend")]
23use tracing::instrument;
24
25/// Automerge-backed implementation of CommandStorage
26///
27/// This struct wraps an AutomergeStore and implements the CommandStorage trait,
28/// providing the Automerge-specific implementation of command dissemination.
29///
30/// # Design
31///
32/// This implementation uses three namespaces via key prefixes:
33/// - `cmd:`: Stores published commands
34/// - `ack:`: Stores command acknowledgments
35/// - `status:`: Stores command execution status
36///
37/// Commands are published once and discovered by target nodes using polling
38/// or the AutomergeStore's change notification channel.
39///
40/// # Observer Pattern
41///
42/// Unlike Ditto which has native observers, Automerge uses a change notification
43/// channel. For the observer methods, we spawn background tasks that poll for
44/// changes and filter based on target_id/issuer_id.
45#[cfg(feature = "automerge-backend")]
46pub struct AutomergeCommandStorage {
47    store: Arc<AutomergeStore>,
48}
49
50#[cfg(feature = "automerge-backend")]
51impl AutomergeCommandStorage {
52    /// Key prefix for commands
53    const COMMANDS_PREFIX: &'static str = "cmd:";
54
55    /// Key prefix for acknowledgments
56    const ACKS_PREFIX: &'static str = "ack:";
57
58    /// Key prefix for status
59    const STATUS_PREFIX: &'static str = "status:";
60
61    /// Create a new AutomergeCommandStorage from an AutomergeStore
62    pub fn new(store: Arc<AutomergeStore>) -> Self {
63        Self { store }
64    }
65
66    /// Get access to underlying AutomergeStore (for Automerge-specific operations)
67    pub fn store(&self) -> &Arc<AutomergeStore> {
68        &self.store
69    }
70
71    fn command_key(command_id: &str) -> String {
72        format!("{}{}", Self::COMMANDS_PREFIX, command_id)
73    }
74
75    fn ack_key(command_id: &str, node_id: &str) -> String {
76        format!("{}{}-{}", Self::ACKS_PREFIX, command_id, node_id)
77    }
78
79    fn ack_prefix(command_id: &str) -> String {
80        format!("{}{}-", Self::ACKS_PREFIX, command_id)
81    }
82
83    fn status_key(command_id: &str) -> String {
84        format!("{}{}", Self::STATUS_PREFIX, command_id)
85    }
86
87    fn now_us() -> u64 {
88        SystemTime::now()
89            .duration_since(UNIX_EPOCH)
90            .unwrap_or_default()
91            .as_micros() as u64
92    }
93}
94
95#[cfg(feature = "automerge-backend")]
96#[async_trait]
97impl CommandStorage for AutomergeCommandStorage {
98    // ========================================================================
99    // Command Operations
100    // ========================================================================
101
102    #[instrument(skip(self, command), fields(command_id = %command.command_id))]
103    async fn publish_command(&self, command: &HierarchicalCommand) -> Result<String> {
104        let key = Self::command_key(&command.command_id);
105
106        // Wrap command with metadata for storage
107        let wrapper = CommandWrapper {
108            command: command.clone(),
109            published_at_us: Self::now_us(),
110        };
111
112        // Convert to Automerge document and store
113        let doc = message_to_automerge(&wrapper).map_err(|e| {
114            crate::Error::storage_error(
115                format!("Failed to convert command to Automerge: {}", e),
116                "publish_command",
117                Some(key.clone()),
118            )
119        })?;
120
121        self.store.put(&key, &doc).map_err(|e| {
122            crate::Error::storage_error(
123                format!("Failed to store command: {}", e),
124                "publish_command",
125                Some(key.clone()),
126            )
127        })?;
128
129        tracing::debug!(
130            command_id = %command.command_id,
131            key = %key,
132            "Published command to Automerge"
133        );
134
135        Ok(key)
136    }
137
138    #[instrument(skip(self), fields(command_id))]
139    async fn get_command(&self, command_id: &str) -> Result<Option<HierarchicalCommand>> {
140        let key = Self::command_key(command_id);
141
142        let doc = match self.store.get(&key) {
143            Ok(Some(doc)) => doc,
144            Ok(None) => return Ok(None),
145            Err(e) => {
146                return Err(crate::Error::storage_error(
147                    format!("Failed to get command: {}", e),
148                    "get_command",
149                    Some(key),
150                ))
151            }
152        };
153
154        let wrapper: CommandWrapper = automerge_to_message(&doc).map_err(|e| {
155            crate::Error::storage_error(
156                format!("Failed to deserialize command: {}", e),
157                "get_command",
158                Some(key),
159            )
160        })?;
161
162        Ok(Some(wrapper.command))
163    }
164
165    #[instrument(skip(self), fields(target_id))]
166    async fn query_commands_by_target(&self, target_id: &str) -> Result<Vec<HierarchicalCommand>> {
167        // Scan all commands and filter by target
168        let docs = self.store.scan_prefix(Self::COMMANDS_PREFIX).map_err(|e| {
169            crate::Error::storage_error(
170                format!("Failed to scan commands: {}", e),
171                "query_commands_by_target",
172                None,
173            )
174        })?;
175
176        let mut commands = Vec::new();
177        for (_key, doc) in docs {
178            if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
179                // Check if target_ids contains the target_id
180                if let Some(ref target) = wrapper.command.target {
181                    if target.target_ids.contains(&target_id.to_string()) {
182                        commands.push(wrapper.command);
183                    }
184                }
185            }
186        }
187
188        Ok(commands)
189    }
190
191    #[instrument(skip(self), fields(command_id))]
192    async fn delete_command(&self, command_id: &str) -> Result<()> {
193        let key = Self::command_key(command_id);
194
195        self.store.delete(&key).map_err(|e| {
196            crate::Error::storage_error(
197                format!("Failed to delete command: {}", e),
198                "delete_command",
199                Some(key.clone()),
200            )
201        })?;
202
203        tracing::debug!(command_id = %command_id, "Deleted command from Automerge");
204
205        Ok(())
206    }
207
208    // ========================================================================
209    // Acknowledgment Operations
210    // ========================================================================
211
212    #[instrument(skip(self, ack), fields(command_id = %ack.command_id, node_id = %ack.node_id))]
213    async fn publish_acknowledgment(&self, ack: &CommandAcknowledgment) -> Result<String> {
214        let key = Self::ack_key(&ack.command_id, &ack.node_id);
215
216        // Wrap acknowledgment with metadata
217        let wrapper = AckWrapper {
218            acknowledgment: ack.clone(),
219            received_at_us: Self::now_us(),
220        };
221
222        // Convert to Automerge document and store
223        let doc = message_to_automerge(&wrapper).map_err(|e| {
224            crate::Error::storage_error(
225                format!("Failed to convert acknowledgment to Automerge: {}", e),
226                "publish_acknowledgment",
227                Some(key.clone()),
228            )
229        })?;
230
231        self.store.put(&key, &doc).map_err(|e| {
232            crate::Error::storage_error(
233                format!("Failed to store acknowledgment: {}", e),
234                "publish_acknowledgment",
235                Some(key.clone()),
236            )
237        })?;
238
239        tracing::debug!(
240            command_id = %ack.command_id,
241            node_id = %ack.node_id,
242            key = %key,
243            "Published acknowledgment to Automerge"
244        );
245
246        Ok(key)
247    }
248
249    #[instrument(skip(self), fields(command_id))]
250    async fn get_acknowledgments(&self, command_id: &str) -> Result<Vec<CommandAcknowledgment>> {
251        let prefix = Self::ack_prefix(command_id);
252
253        let docs = self.store.scan_prefix(&prefix).map_err(|e| {
254            crate::Error::storage_error(
255                format!("Failed to scan acknowledgments: {}", e),
256                "get_acknowledgments",
257                Some(command_id.to_string()),
258            )
259        })?;
260
261        let mut acks = Vec::new();
262        for (_key, doc) in docs {
263            if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
264                acks.push(wrapper.acknowledgment);
265            }
266        }
267
268        Ok(acks)
269    }
270
271    // ========================================================================
272    // Status Tracking Operations
273    // ========================================================================
274
275    #[instrument(skip(self, status), fields(command_id = %status.command_id))]
276    async fn update_command_status(&self, status: &CommandStatus) -> Result<()> {
277        let key = Self::status_key(&status.command_id);
278
279        // Wrap status with metadata
280        let wrapper = StatusWrapper {
281            status: status.clone(),
282            updated_at_us: Self::now_us(),
283        };
284
285        // Convert to Automerge document and store (upsert semantics)
286        let doc = message_to_automerge(&wrapper).map_err(|e| {
287            crate::Error::storage_error(
288                format!("Failed to convert status to Automerge: {}", e),
289                "update_command_status",
290                Some(key.clone()),
291            )
292        })?;
293
294        self.store.put(&key, &doc).map_err(|e| {
295            crate::Error::storage_error(
296                format!("Failed to store status: {}", e),
297                "update_command_status",
298                Some(key.clone()),
299            )
300        })?;
301
302        tracing::debug!(
303            command_id = %status.command_id,
304            state = status.state,
305            "Updated command status in Automerge"
306        );
307
308        Ok(())
309    }
310
311    #[instrument(skip(self), fields(command_id))]
312    async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>> {
313        let key = Self::status_key(command_id);
314
315        let doc = match self.store.get(&key) {
316            Ok(Some(doc)) => doc,
317            Ok(None) => return Ok(None),
318            Err(e) => {
319                return Err(crate::Error::storage_error(
320                    format!("Failed to get status: {}", e),
321                    "get_command_status",
322                    Some(key),
323                ))
324            }
325        };
326
327        let wrapper: StatusWrapper = automerge_to_message(&doc).map_err(|e| {
328            crate::Error::storage_error(
329                format!("Failed to deserialize status: {}", e),
330                "get_command_status",
331                Some(key),
332            )
333        })?;
334
335        Ok(Some(wrapper.status))
336    }
337
338    // ========================================================================
339    // Observer Pattern
340    // ========================================================================
341
342    /// Register a callback for new commands targeting this node
343    ///
344    /// # Note on Automerge Implementation
345    ///
346    /// Unlike Ditto's native observer pattern, Automerge uses change notifications.
347    /// This implementation spawns a background task that:
348    /// 1. Subscribes to the store's change channel
349    /// 2. Filters changes for command keys
350    /// 3. Deserializes and checks if the command targets this node
351    /// 4. Invokes the callback for matching commands
352    async fn observe_commands(
353        &self,
354        node_id: &str,
355        callback: Box<
356            dyn Fn(
357                    HierarchicalCommand,
358                )
359                    -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
360                + Send
361                + Sync,
362        >,
363    ) -> Result<ObserverHandle> {
364        let store = Arc::clone(&self.store);
365        let node_id = node_id.to_string();
366        let callback = Arc::new(callback);
367
368        // Create a cancellation token using a channel
369        let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
370
371        // Spawn background task to poll for commands
372        let poll_store = Arc::clone(&store);
373        let poll_node_id = node_id.clone();
374        let poll_callback = Arc::clone(&callback);
375
376        tokio::spawn(async move {
377            let mut seen_commands: std::collections::HashSet<String> =
378                std::collections::HashSet::new();
379
380            // Initial scan for existing commands
381            if let Ok(docs) = poll_store.scan_prefix(Self::COMMANDS_PREFIX) {
382                for (key, doc) in docs {
383                    if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
384                        if let Some(ref target) = wrapper.command.target {
385                            if target.target_ids.contains(&poll_node_id) {
386                                seen_commands.insert(key);
387                                let cmd = wrapper.command.clone();
388                                let cb = Arc::clone(&poll_callback);
389                                tokio::spawn(async move {
390                                    cb(cmd).await;
391                                });
392                            }
393                        }
394                    }
395                }
396            }
397
398            // Polling loop
399            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
400            loop {
401                tokio::select! {
402                    _ = cancel_rx.recv() => {
403                        tracing::debug!(node_id = %poll_node_id, "Command observer cancelled");
404                        break;
405                    }
406                    _ = interval.tick() => {
407                        if let Ok(docs) = poll_store.scan_prefix(Self::COMMANDS_PREFIX) {
408                            for (key, doc) in docs {
409                                if seen_commands.contains(&key) {
410                                    continue;
411                                }
412                                if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
413                                    if let Some(ref target) = wrapper.command.target {
414                                        if target.target_ids.contains(&poll_node_id) {
415                                            seen_commands.insert(key);
416                                            let cmd = wrapper.command.clone();
417                                            let cb = Arc::clone(&poll_callback);
418                                            tokio::spawn(async move {
419                                                cb(cmd).await;
420                                            });
421                                        }
422                                    }
423                                }
424                            }
425                        }
426                    }
427                }
428            }
429        });
430
431        tracing::debug!(node_id = %node_id, "Registered command observer");
432
433        Ok(ObserverHandle::new(cancel_tx))
434    }
435
436    /// Register a callback for new acknowledgments for commands issued by this node
437    async fn observe_acknowledgments(
438        &self,
439        issuer_id: &str,
440        callback: Box<
441            dyn Fn(
442                    CommandAcknowledgment,
443                )
444                    -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
445                + Send
446                + Sync,
447        >,
448    ) -> Result<ObserverHandle> {
449        let store = Arc::clone(&self.store);
450        let issuer_id = issuer_id.to_string();
451        let callback = Arc::new(callback);
452
453        // Create a cancellation token
454        let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
455
456        // Spawn background task to poll for acknowledgments
457        let poll_store = Arc::clone(&store);
458        let poll_issuer_id = issuer_id.clone();
459        let poll_callback = Arc::clone(&callback);
460
461        tokio::spawn(async move {
462            let mut seen_acks: std::collections::HashSet<String> = std::collections::HashSet::new();
463
464            // Initial scan for existing acks
465            if let Ok(docs) = poll_store.scan_prefix(Self::ACKS_PREFIX) {
466                for (key, doc) in docs {
467                    if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
468                        // We need to check if the command was issued by this node
469                        // For now, we pass all acks - the caller can filter
470                        seen_acks.insert(key);
471                        let ack = wrapper.acknowledgment.clone();
472                        let cb = Arc::clone(&poll_callback);
473                        tokio::spawn(async move {
474                            cb(ack).await;
475                        });
476                    }
477                }
478            }
479
480            // Polling loop
481            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
482            loop {
483                tokio::select! {
484                    _ = cancel_rx.recv() => {
485                        tracing::debug!(issuer_id = %poll_issuer_id, "Acknowledgment observer cancelled");
486                        break;
487                    }
488                    _ = interval.tick() => {
489                        if let Ok(docs) = poll_store.scan_prefix(Self::ACKS_PREFIX) {
490                            for (key, doc) in docs {
491                                if seen_acks.contains(&key) {
492                                    continue;
493                                }
494                                if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
495                                    seen_acks.insert(key);
496                                    let ack = wrapper.acknowledgment.clone();
497                                    let cb = Arc::clone(&poll_callback);
498                                    tokio::spawn(async move {
499                                        cb(ack).await;
500                                    });
501                                }
502                            }
503                        }
504                    }
505                }
506            }
507        });
508
509        tracing::debug!(issuer_id = %issuer_id, "Registered acknowledgment observer");
510
511        Ok(ObserverHandle::new(cancel_tx))
512    }
513}
514
515// ============================================================================
516// Internal wrapper types for storage with metadata
517// ============================================================================
518
519#[cfg(feature = "automerge-backend")]
520#[derive(serde::Serialize, serde::Deserialize, Clone)]
521struct CommandWrapper {
522    command: HierarchicalCommand,
523    published_at_us: u64,
524}
525
526#[cfg(feature = "automerge-backend")]
527#[derive(serde::Serialize, serde::Deserialize, Clone)]
528struct AckWrapper {
529    acknowledgment: CommandAcknowledgment,
530    received_at_us: u64,
531}
532
533#[cfg(feature = "automerge-backend")]
534#[derive(serde::Serialize, serde::Deserialize, Clone)]
535struct StatusWrapper {
536    status: CommandStatus,
537    updated_at_us: u64,
538}
539
540// ============================================================================
541// Tests
542// ============================================================================
543
544#[cfg(all(test, feature = "automerge-backend"))]
545mod tests {
546    use super::*;
547    use peat_schema::command::v1::{command_target, CommandTarget};
548    use tempfile::TempDir;
549
550    fn create_test_storage() -> (AutomergeCommandStorage, TempDir) {
551        let temp_dir = TempDir::new().expect("Failed to create temp dir");
552        let store = AutomergeStore::open(temp_dir.path()).expect("Failed to create store");
553        (AutomergeCommandStorage::new(Arc::new(store)), temp_dir)
554    }
555
556    fn create_test_command(command_id: &str, target_ids: Vec<String>) -> HierarchicalCommand {
557        HierarchicalCommand {
558            command_id: command_id.to_string(),
559            originator_id: "test-originator".to_string(),
560            target: Some(CommandTarget {
561                scope: command_target::Scope::Individual as i32,
562                target_ids,
563            }),
564            ..Default::default()
565        }
566    }
567
568    fn create_test_ack(command_id: &str, node_id: &str) -> CommandAcknowledgment {
569        CommandAcknowledgment {
570            command_id: command_id.to_string(),
571            node_id: node_id.to_string(),
572            ..Default::default()
573        }
574    }
575
576    fn create_test_status(command_id: &str, state: i32) -> CommandStatus {
577        CommandStatus {
578            command_id: command_id.to_string(),
579            state,
580            ..Default::default()
581        }
582    }
583
584    #[tokio::test]
585    async fn test_command_crud() {
586        let (storage, _temp) = create_test_storage();
587
588        // Create
589        let command = create_test_command("cmd-1", vec!["node-1".to_string()]);
590        let doc_id = storage.publish_command(&command).await.unwrap();
591        assert!(doc_id.starts_with("cmd:"));
592
593        // Read
594        let retrieved = storage.get_command("cmd-1").await.unwrap().unwrap();
595        assert_eq!(retrieved.command_id, "cmd-1");
596        assert_eq!(retrieved.originator_id, "test-originator");
597
598        // Query by target
599        let commands = storage.query_commands_by_target("node-1").await.unwrap();
600        assert_eq!(commands.len(), 1);
601        assert_eq!(commands[0].command_id, "cmd-1");
602
603        // Query miss
604        let empty = storage.query_commands_by_target("node-2").await.unwrap();
605        assert!(empty.is_empty());
606
607        // Delete
608        storage.delete_command("cmd-1").await.unwrap();
609        let deleted = storage.get_command("cmd-1").await.unwrap();
610        assert!(deleted.is_none());
611    }
612
613    #[tokio::test]
614    async fn test_acknowledgment_crud() {
615        let (storage, _temp) = create_test_storage();
616
617        // Publish command first
618        let command =
619            create_test_command("cmd-1", vec!["node-1".to_string(), "node-2".to_string()]);
620        storage.publish_command(&command).await.unwrap();
621
622        // Publish acknowledgments from multiple nodes
623        let ack1 = create_test_ack("cmd-1", "node-1");
624        let ack2 = create_test_ack("cmd-1", "node-2");
625
626        storage.publish_acknowledgment(&ack1).await.unwrap();
627        storage.publish_acknowledgment(&ack2).await.unwrap();
628
629        // Get all acknowledgments
630        let acks = storage.get_acknowledgments("cmd-1").await.unwrap();
631        assert_eq!(acks.len(), 2);
632
633        let node_ids: Vec<&str> = acks.iter().map(|a| a.node_id.as_str()).collect();
634        assert!(node_ids.contains(&"node-1"));
635        assert!(node_ids.contains(&"node-2"));
636    }
637
638    #[tokio::test]
639    async fn test_status_crud() {
640        let (storage, _temp) = create_test_storage();
641
642        // Initial status
643        let status1 = create_test_status("cmd-1", 1); // Pending
644        storage.update_command_status(&status1).await.unwrap();
645
646        let retrieved = storage.get_command_status("cmd-1").await.unwrap().unwrap();
647        assert_eq!(retrieved.command_id, "cmd-1");
648        assert_eq!(retrieved.state, 1);
649
650        // Update status (upsert semantics)
651        let status2 = create_test_status("cmd-1", 2); // Completed
652        storage.update_command_status(&status2).await.unwrap();
653
654        let updated = storage.get_command_status("cmd-1").await.unwrap().unwrap();
655        assert_eq!(updated.state, 2);
656    }
657
658    #[tokio::test]
659    async fn test_get_nonexistent() {
660        let (storage, _temp) = create_test_storage();
661
662        assert!(storage.get_command("nonexistent").await.unwrap().is_none());
663        assert!(storage
664            .get_command_status("nonexistent")
665            .await
666            .unwrap()
667            .is_none());
668        assert!(storage
669            .get_acknowledgments("nonexistent")
670            .await
671            .unwrap()
672            .is_empty());
673    }
674
675    #[tokio::test]
676    async fn test_multiple_commands() {
677        let (storage, _temp) = create_test_storage();
678
679        // Create multiple commands targeting different nodes
680        let cmd1 = create_test_command("cmd-1", vec!["node-1".to_string()]);
681        let cmd2 = create_test_command("cmd-2", vec!["node-1".to_string(), "node-2".to_string()]);
682        let cmd3 = create_test_command("cmd-3", vec!["node-2".to_string()]);
683
684        storage.publish_command(&cmd1).await.unwrap();
685        storage.publish_command(&cmd2).await.unwrap();
686        storage.publish_command(&cmd3).await.unwrap();
687
688        // Query by node-1: should get cmd-1 and cmd-2
689        let node1_cmds = storage.query_commands_by_target("node-1").await.unwrap();
690        assert_eq!(node1_cmds.len(), 2);
691
692        // Query by node-2: should get cmd-2 and cmd-3
693        let node2_cmds = storage.query_commands_by_target("node-2").await.unwrap();
694        assert_eq!(node2_cmds.len(), 2);
695    }
696}