1#[cfg(feature = "automerge-backend")]
7use crate::command::{CommandStorage, ObserverHandle};
8#[cfg(feature = "automerge-backend")]
9use crate::storage::automerge_conversion::{automerge_to_message, message_to_automerge};
10#[cfg(feature = "automerge-backend")]
11use crate::storage::automerge_store::AutomergeStore;
12#[cfg(feature = "automerge-backend")]
13use crate::Result;
14#[cfg(feature = "automerge-backend")]
15use async_trait::async_trait;
16#[cfg(feature = "automerge-backend")]
17use peat_schema::command::v1::{CommandAcknowledgment, CommandStatus, HierarchicalCommand};
18#[cfg(feature = "automerge-backend")]
19use std::sync::Arc;
20#[cfg(feature = "automerge-backend")]
21use std::time::{SystemTime, UNIX_EPOCH};
22#[cfg(feature = "automerge-backend")]
23use tracing::instrument;
24
25#[cfg(feature = "automerge-backend")]
46pub struct AutomergeCommandStorage {
47 store: Arc<AutomergeStore>,
48}
49
50#[cfg(feature = "automerge-backend")]
51impl AutomergeCommandStorage {
52 const COMMANDS_PREFIX: &'static str = "cmd:";
54
55 const ACKS_PREFIX: &'static str = "ack:";
57
58 const STATUS_PREFIX: &'static str = "status:";
60
61 pub fn new(store: Arc<AutomergeStore>) -> Self {
63 Self { store }
64 }
65
66 pub fn store(&self) -> &Arc<AutomergeStore> {
68 &self.store
69 }
70
71 fn command_key(command_id: &str) -> String {
72 format!("{}{}", Self::COMMANDS_PREFIX, command_id)
73 }
74
75 fn ack_key(command_id: &str, node_id: &str) -> String {
76 format!("{}{}-{}", Self::ACKS_PREFIX, command_id, node_id)
77 }
78
79 fn ack_prefix(command_id: &str) -> String {
80 format!("{}{}-", Self::ACKS_PREFIX, command_id)
81 }
82
83 fn status_key(command_id: &str) -> String {
84 format!("{}{}", Self::STATUS_PREFIX, command_id)
85 }
86
87 fn now_us() -> u64 {
88 SystemTime::now()
89 .duration_since(UNIX_EPOCH)
90 .unwrap_or_default()
91 .as_micros() as u64
92 }
93}
94
95#[cfg(feature = "automerge-backend")]
96#[async_trait]
97impl CommandStorage for AutomergeCommandStorage {
98 #[instrument(skip(self, command), fields(command_id = %command.command_id))]
103 async fn publish_command(&self, command: &HierarchicalCommand) -> Result<String> {
104 let key = Self::command_key(&command.command_id);
105
106 let wrapper = CommandWrapper {
108 command: command.clone(),
109 published_at_us: Self::now_us(),
110 };
111
112 let doc = message_to_automerge(&wrapper).map_err(|e| {
114 crate::Error::storage_error(
115 format!("Failed to convert command to Automerge: {}", e),
116 "publish_command",
117 Some(key.clone()),
118 )
119 })?;
120
121 self.store.put(&key, &doc).map_err(|e| {
122 crate::Error::storage_error(
123 format!("Failed to store command: {}", e),
124 "publish_command",
125 Some(key.clone()),
126 )
127 })?;
128
129 tracing::debug!(
130 command_id = %command.command_id,
131 key = %key,
132 "Published command to Automerge"
133 );
134
135 Ok(key)
136 }
137
138 #[instrument(skip(self), fields(command_id))]
139 async fn get_command(&self, command_id: &str) -> Result<Option<HierarchicalCommand>> {
140 let key = Self::command_key(command_id);
141
142 let doc = match self.store.get(&key) {
143 Ok(Some(doc)) => doc,
144 Ok(None) => return Ok(None),
145 Err(e) => {
146 return Err(crate::Error::storage_error(
147 format!("Failed to get command: {}", e),
148 "get_command",
149 Some(key),
150 ))
151 }
152 };
153
154 let wrapper: CommandWrapper = automerge_to_message(&doc).map_err(|e| {
155 crate::Error::storage_error(
156 format!("Failed to deserialize command: {}", e),
157 "get_command",
158 Some(key),
159 )
160 })?;
161
162 Ok(Some(wrapper.command))
163 }
164
165 #[instrument(skip(self), fields(target_id))]
166 async fn query_commands_by_target(&self, target_id: &str) -> Result<Vec<HierarchicalCommand>> {
167 let docs = self.store.scan_prefix(Self::COMMANDS_PREFIX).map_err(|e| {
169 crate::Error::storage_error(
170 format!("Failed to scan commands: {}", e),
171 "query_commands_by_target",
172 None,
173 )
174 })?;
175
176 let mut commands = Vec::new();
177 for (_key, doc) in docs {
178 if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
179 if let Some(ref target) = wrapper.command.target {
181 if target.target_ids.contains(&target_id.to_string()) {
182 commands.push(wrapper.command);
183 }
184 }
185 }
186 }
187
188 Ok(commands)
189 }
190
191 #[instrument(skip(self), fields(command_id))]
192 async fn delete_command(&self, command_id: &str) -> Result<()> {
193 let key = Self::command_key(command_id);
194
195 self.store.delete(&key).map_err(|e| {
196 crate::Error::storage_error(
197 format!("Failed to delete command: {}", e),
198 "delete_command",
199 Some(key.clone()),
200 )
201 })?;
202
203 tracing::debug!(command_id = %command_id, "Deleted command from Automerge");
204
205 Ok(())
206 }
207
208 #[instrument(skip(self, ack), fields(command_id = %ack.command_id, node_id = %ack.node_id))]
213 async fn publish_acknowledgment(&self, ack: &CommandAcknowledgment) -> Result<String> {
214 let key = Self::ack_key(&ack.command_id, &ack.node_id);
215
216 let wrapper = AckWrapper {
218 acknowledgment: ack.clone(),
219 received_at_us: Self::now_us(),
220 };
221
222 let doc = message_to_automerge(&wrapper).map_err(|e| {
224 crate::Error::storage_error(
225 format!("Failed to convert acknowledgment to Automerge: {}", e),
226 "publish_acknowledgment",
227 Some(key.clone()),
228 )
229 })?;
230
231 self.store.put(&key, &doc).map_err(|e| {
232 crate::Error::storage_error(
233 format!("Failed to store acknowledgment: {}", e),
234 "publish_acknowledgment",
235 Some(key.clone()),
236 )
237 })?;
238
239 tracing::debug!(
240 command_id = %ack.command_id,
241 node_id = %ack.node_id,
242 key = %key,
243 "Published acknowledgment to Automerge"
244 );
245
246 Ok(key)
247 }
248
249 #[instrument(skip(self), fields(command_id))]
250 async fn get_acknowledgments(&self, command_id: &str) -> Result<Vec<CommandAcknowledgment>> {
251 let prefix = Self::ack_prefix(command_id);
252
253 let docs = self.store.scan_prefix(&prefix).map_err(|e| {
254 crate::Error::storage_error(
255 format!("Failed to scan acknowledgments: {}", e),
256 "get_acknowledgments",
257 Some(command_id.to_string()),
258 )
259 })?;
260
261 let mut acks = Vec::new();
262 for (_key, doc) in docs {
263 if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
264 acks.push(wrapper.acknowledgment);
265 }
266 }
267
268 Ok(acks)
269 }
270
271 #[instrument(skip(self, status), fields(command_id = %status.command_id))]
276 async fn update_command_status(&self, status: &CommandStatus) -> Result<()> {
277 let key = Self::status_key(&status.command_id);
278
279 let wrapper = StatusWrapper {
281 status: status.clone(),
282 updated_at_us: Self::now_us(),
283 };
284
285 let doc = message_to_automerge(&wrapper).map_err(|e| {
287 crate::Error::storage_error(
288 format!("Failed to convert status to Automerge: {}", e),
289 "update_command_status",
290 Some(key.clone()),
291 )
292 })?;
293
294 self.store.put(&key, &doc).map_err(|e| {
295 crate::Error::storage_error(
296 format!("Failed to store status: {}", e),
297 "update_command_status",
298 Some(key.clone()),
299 )
300 })?;
301
302 tracing::debug!(
303 command_id = %status.command_id,
304 state = status.state,
305 "Updated command status in Automerge"
306 );
307
308 Ok(())
309 }
310
311 #[instrument(skip(self), fields(command_id))]
312 async fn get_command_status(&self, command_id: &str) -> Result<Option<CommandStatus>> {
313 let key = Self::status_key(command_id);
314
315 let doc = match self.store.get(&key) {
316 Ok(Some(doc)) => doc,
317 Ok(None) => return Ok(None),
318 Err(e) => {
319 return Err(crate::Error::storage_error(
320 format!("Failed to get status: {}", e),
321 "get_command_status",
322 Some(key),
323 ))
324 }
325 };
326
327 let wrapper: StatusWrapper = automerge_to_message(&doc).map_err(|e| {
328 crate::Error::storage_error(
329 format!("Failed to deserialize status: {}", e),
330 "get_command_status",
331 Some(key),
332 )
333 })?;
334
335 Ok(Some(wrapper.status))
336 }
337
338 async fn observe_commands(
353 &self,
354 node_id: &str,
355 callback: Box<
356 dyn Fn(
357 HierarchicalCommand,
358 )
359 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
360 + Send
361 + Sync,
362 >,
363 ) -> Result<ObserverHandle> {
364 let store = Arc::clone(&self.store);
365 let node_id = node_id.to_string();
366 let callback = Arc::new(callback);
367
368 let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
370
371 let poll_store = Arc::clone(&store);
373 let poll_node_id = node_id.clone();
374 let poll_callback = Arc::clone(&callback);
375
376 tokio::spawn(async move {
377 let mut seen_commands: std::collections::HashSet<String> =
378 std::collections::HashSet::new();
379
380 if let Ok(docs) = poll_store.scan_prefix(Self::COMMANDS_PREFIX) {
382 for (key, doc) in docs {
383 if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
384 if let Some(ref target) = wrapper.command.target {
385 if target.target_ids.contains(&poll_node_id) {
386 seen_commands.insert(key);
387 let cmd = wrapper.command.clone();
388 let cb = Arc::clone(&poll_callback);
389 tokio::spawn(async move {
390 cb(cmd).await;
391 });
392 }
393 }
394 }
395 }
396 }
397
398 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
400 loop {
401 tokio::select! {
402 _ = cancel_rx.recv() => {
403 tracing::debug!(node_id = %poll_node_id, "Command observer cancelled");
404 break;
405 }
406 _ = interval.tick() => {
407 if let Ok(docs) = poll_store.scan_prefix(Self::COMMANDS_PREFIX) {
408 for (key, doc) in docs {
409 if seen_commands.contains(&key) {
410 continue;
411 }
412 if let Ok(wrapper) = automerge_to_message::<CommandWrapper>(&doc) {
413 if let Some(ref target) = wrapper.command.target {
414 if target.target_ids.contains(&poll_node_id) {
415 seen_commands.insert(key);
416 let cmd = wrapper.command.clone();
417 let cb = Arc::clone(&poll_callback);
418 tokio::spawn(async move {
419 cb(cmd).await;
420 });
421 }
422 }
423 }
424 }
425 }
426 }
427 }
428 }
429 });
430
431 tracing::debug!(node_id = %node_id, "Registered command observer");
432
433 Ok(ObserverHandle::new(cancel_tx))
434 }
435
436 async fn observe_acknowledgments(
438 &self,
439 issuer_id: &str,
440 callback: Box<
441 dyn Fn(
442 CommandAcknowledgment,
443 )
444 -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
445 + Send
446 + Sync,
447 >,
448 ) -> Result<ObserverHandle> {
449 let store = Arc::clone(&self.store);
450 let issuer_id = issuer_id.to_string();
451 let callback = Arc::new(callback);
452
453 let (cancel_tx, mut cancel_rx) = tokio::sync::mpsc::channel::<()>(1);
455
456 let poll_store = Arc::clone(&store);
458 let poll_issuer_id = issuer_id.clone();
459 let poll_callback = Arc::clone(&callback);
460
461 tokio::spawn(async move {
462 let mut seen_acks: std::collections::HashSet<String> = std::collections::HashSet::new();
463
464 if let Ok(docs) = poll_store.scan_prefix(Self::ACKS_PREFIX) {
466 for (key, doc) in docs {
467 if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
468 seen_acks.insert(key);
471 let ack = wrapper.acknowledgment.clone();
472 let cb = Arc::clone(&poll_callback);
473 tokio::spawn(async move {
474 cb(ack).await;
475 });
476 }
477 }
478 }
479
480 let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(100));
482 loop {
483 tokio::select! {
484 _ = cancel_rx.recv() => {
485 tracing::debug!(issuer_id = %poll_issuer_id, "Acknowledgment observer cancelled");
486 break;
487 }
488 _ = interval.tick() => {
489 if let Ok(docs) = poll_store.scan_prefix(Self::ACKS_PREFIX) {
490 for (key, doc) in docs {
491 if seen_acks.contains(&key) {
492 continue;
493 }
494 if let Ok(wrapper) = automerge_to_message::<AckWrapper>(&doc) {
495 seen_acks.insert(key);
496 let ack = wrapper.acknowledgment.clone();
497 let cb = Arc::clone(&poll_callback);
498 tokio::spawn(async move {
499 cb(ack).await;
500 });
501 }
502 }
503 }
504 }
505 }
506 }
507 });
508
509 tracing::debug!(issuer_id = %issuer_id, "Registered acknowledgment observer");
510
511 Ok(ObserverHandle::new(cancel_tx))
512 }
513}
514
515#[cfg(feature = "automerge-backend")]
520#[derive(serde::Serialize, serde::Deserialize, Clone)]
521struct CommandWrapper {
522 command: HierarchicalCommand,
523 published_at_us: u64,
524}
525
526#[cfg(feature = "automerge-backend")]
527#[derive(serde::Serialize, serde::Deserialize, Clone)]
528struct AckWrapper {
529 acknowledgment: CommandAcknowledgment,
530 received_at_us: u64,
531}
532
533#[cfg(feature = "automerge-backend")]
534#[derive(serde::Serialize, serde::Deserialize, Clone)]
535struct StatusWrapper {
536 status: CommandStatus,
537 updated_at_us: u64,
538}
539
540#[cfg(all(test, feature = "automerge-backend"))]
545mod tests {
546 use super::*;
547 use peat_schema::command::v1::{command_target, CommandTarget};
548 use tempfile::TempDir;
549
550 fn create_test_storage() -> (AutomergeCommandStorage, TempDir) {
551 let temp_dir = TempDir::new().expect("Failed to create temp dir");
552 let store = AutomergeStore::open(temp_dir.path()).expect("Failed to create store");
553 (AutomergeCommandStorage::new(Arc::new(store)), temp_dir)
554 }
555
556 fn create_test_command(command_id: &str, target_ids: Vec<String>) -> HierarchicalCommand {
557 HierarchicalCommand {
558 command_id: command_id.to_string(),
559 originator_id: "test-originator".to_string(),
560 target: Some(CommandTarget {
561 scope: command_target::Scope::Individual as i32,
562 target_ids,
563 }),
564 ..Default::default()
565 }
566 }
567
568 fn create_test_ack(command_id: &str, node_id: &str) -> CommandAcknowledgment {
569 CommandAcknowledgment {
570 command_id: command_id.to_string(),
571 node_id: node_id.to_string(),
572 ..Default::default()
573 }
574 }
575
576 fn create_test_status(command_id: &str, state: i32) -> CommandStatus {
577 CommandStatus {
578 command_id: command_id.to_string(),
579 state,
580 ..Default::default()
581 }
582 }
583
584 #[tokio::test]
585 async fn test_command_crud() {
586 let (storage, _temp) = create_test_storage();
587
588 let command = create_test_command("cmd-1", vec!["node-1".to_string()]);
590 let doc_id = storage.publish_command(&command).await.unwrap();
591 assert!(doc_id.starts_with("cmd:"));
592
593 let retrieved = storage.get_command("cmd-1").await.unwrap().unwrap();
595 assert_eq!(retrieved.command_id, "cmd-1");
596 assert_eq!(retrieved.originator_id, "test-originator");
597
598 let commands = storage.query_commands_by_target("node-1").await.unwrap();
600 assert_eq!(commands.len(), 1);
601 assert_eq!(commands[0].command_id, "cmd-1");
602
603 let empty = storage.query_commands_by_target("node-2").await.unwrap();
605 assert!(empty.is_empty());
606
607 storage.delete_command("cmd-1").await.unwrap();
609 let deleted = storage.get_command("cmd-1").await.unwrap();
610 assert!(deleted.is_none());
611 }
612
613 #[tokio::test]
614 async fn test_acknowledgment_crud() {
615 let (storage, _temp) = create_test_storage();
616
617 let command =
619 create_test_command("cmd-1", vec!["node-1".to_string(), "node-2".to_string()]);
620 storage.publish_command(&command).await.unwrap();
621
622 let ack1 = create_test_ack("cmd-1", "node-1");
624 let ack2 = create_test_ack("cmd-1", "node-2");
625
626 storage.publish_acknowledgment(&ack1).await.unwrap();
627 storage.publish_acknowledgment(&ack2).await.unwrap();
628
629 let acks = storage.get_acknowledgments("cmd-1").await.unwrap();
631 assert_eq!(acks.len(), 2);
632
633 let node_ids: Vec<&str> = acks.iter().map(|a| a.node_id.as_str()).collect();
634 assert!(node_ids.contains(&"node-1"));
635 assert!(node_ids.contains(&"node-2"));
636 }
637
638 #[tokio::test]
639 async fn test_status_crud() {
640 let (storage, _temp) = create_test_storage();
641
642 let status1 = create_test_status("cmd-1", 1); storage.update_command_status(&status1).await.unwrap();
645
646 let retrieved = storage.get_command_status("cmd-1").await.unwrap().unwrap();
647 assert_eq!(retrieved.command_id, "cmd-1");
648 assert_eq!(retrieved.state, 1);
649
650 let status2 = create_test_status("cmd-1", 2); storage.update_command_status(&status2).await.unwrap();
653
654 let updated = storage.get_command_status("cmd-1").await.unwrap().unwrap();
655 assert_eq!(updated.state, 2);
656 }
657
658 #[tokio::test]
659 async fn test_get_nonexistent() {
660 let (storage, _temp) = create_test_storage();
661
662 assert!(storage.get_command("nonexistent").await.unwrap().is_none());
663 assert!(storage
664 .get_command_status("nonexistent")
665 .await
666 .unwrap()
667 .is_none());
668 assert!(storage
669 .get_acknowledgments("nonexistent")
670 .await
671 .unwrap()
672 .is_empty());
673 }
674
675 #[tokio::test]
676 async fn test_multiple_commands() {
677 let (storage, _temp) = create_test_storage();
678
679 let cmd1 = create_test_command("cmd-1", vec!["node-1".to_string()]);
681 let cmd2 = create_test_command("cmd-2", vec!["node-1".to_string(), "node-2".to_string()]);
682 let cmd3 = create_test_command("cmd-3", vec!["node-2".to_string()]);
683
684 storage.publish_command(&cmd1).await.unwrap();
685 storage.publish_command(&cmd2).await.unwrap();
686 storage.publish_command(&cmd3).await.unwrap();
687
688 let node1_cmds = storage.query_commands_by_target("node-1").await.unwrap();
690 assert_eq!(node1_cmds.len(), 2);
691
692 let node2_cmds = storage.query_commands_by_target("node-2").await.unwrap();
694 assert_eq!(node2_cmds.len(), 2);
695 }
696}