1use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::channels::priority_queue::PriorityQueue;
39use crate::channels::{ComponentStatus, QueryResult};
40use crate::component_graph::ComponentStatusHandle;
41use crate::context::ReactionRuntimeContext;
42use crate::identity::IdentityProvider;
43use crate::reactions::checkpoint::ReactionCheckpoint;
44use crate::recovery::ReactionRecoveryPolicy;
45use crate::state_store::StateStoreProvider;
46
47#[derive(Debug, Clone)]
64pub struct ReactionBaseParams {
65 pub id: String,
67 pub queries: Vec<String>,
69 pub priority_queue_capacity: Option<usize>,
71 pub auto_start: bool,
73 pub recovery_policy: Option<ReactionRecoveryPolicy>,
75}
76
77impl ReactionBaseParams {
78 pub fn new(id: impl Into<String>, queries: Vec<String>) -> Self {
80 Self {
81 id: id.into(),
82 queries,
83 priority_queue_capacity: None,
84 auto_start: true, recovery_policy: None,
86 }
87 }
88
89 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
91 self.priority_queue_capacity = Some(capacity);
92 self
93 }
94
95 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
97 self.auto_start = auto_start;
98 self
99 }
100
101 pub fn with_recovery_policy(mut self, policy: ReactionRecoveryPolicy) -> Self {
103 self.recovery_policy = Some(policy);
104 self
105 }
106}
107
108pub struct ReactionBase {
110 pub id: String,
112 pub queries: Vec<String>,
114 pub auto_start: bool,
116 pub recovery_policy: Option<ReactionRecoveryPolicy>,
118 status_handle: ComponentStatusHandle,
120 context: Arc<RwLock<Option<ReactionRuntimeContext>>>,
122 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
124 pub priority_queue: PriorityQueue<QueryResult>,
126 pub subscription_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
128 pub processing_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
130 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
132 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
136 raw_config: Option<serde_json::Value>,
139}
140
141impl ReactionBase {
142 pub fn new(params: ReactionBaseParams) -> Self {
147 Self {
148 priority_queue: PriorityQueue::new(params.priority_queue_capacity.unwrap_or(10000)),
149 id: params.id.clone(),
150 queries: params.queries,
151 auto_start: params.auto_start,
152 recovery_policy: params.recovery_policy,
153 status_handle: ComponentStatusHandle::new(¶ms.id),
154 context: Arc::new(RwLock::new(None)), state_store: Arc::new(RwLock::new(None)), subscription_tasks: Arc::new(RwLock::new(Vec::new())),
157 processing_task: Arc::new(RwLock::new(None)),
158 shutdown_tx: Arc::new(RwLock::new(None)),
159 identity_provider: Arc::new(RwLock::new(None)),
160 raw_config: None,
161 }
162 }
163
164 pub async fn initialize(&self, context: ReactionRuntimeContext) {
174 *self.context.write().await = Some(context.clone());
176
177 self.status_handle.wire(context.update_tx.clone()).await;
179
180 if let Some(state_store) = context.state_store.as_ref() {
181 *self.state_store.write().await = Some(state_store.clone());
182 }
183
184 if let Some(ip) = context.identity_provider.as_ref() {
186 let mut guard = self.identity_provider.write().await;
187 if guard.is_none() {
188 *guard = Some(ip.clone());
189 }
190 }
191 }
192
193 pub async fn context(&self) -> Option<ReactionRuntimeContext> {
197 self.context.read().await.clone()
198 }
199
200 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
204 self.state_store.read().await.clone()
205 }
206
207 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
213 self.identity_provider.read().await.clone()
214 }
215
216 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
222 *self.identity_provider.write().await = Some(provider);
223 }
224
225 pub fn get_auto_start(&self) -> bool {
227 self.auto_start
228 }
229
230 pub fn set_raw_config(&mut self, config: serde_json::Value) {
232 self.raw_config = Some(config);
233 }
234
235 pub fn raw_config(&self) -> Option<&serde_json::Value> {
237 self.raw_config.as_ref()
238 }
239
240 pub fn properties_or_serialize<D: serde::Serialize>(
248 &self,
249 fallback_dto: &D,
250 ) -> std::collections::HashMap<String, serde_json::Value> {
251 if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
252 return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
253 }
254
255 match serde_json::to_value(fallback_dto) {
256 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
257 _ => std::collections::HashMap::new(),
258 }
259 }
260
261 pub fn clone_shared(&self) -> Self {
266 Self {
267 id: self.id.clone(),
268 queries: self.queries.clone(),
269 auto_start: self.auto_start,
270 recovery_policy: self.recovery_policy,
271 status_handle: self.status_handle.clone(),
272 context: self.context.clone(),
273 state_store: self.state_store.clone(),
274 priority_queue: self.priority_queue.clone(),
275 subscription_tasks: self.subscription_tasks.clone(),
276 processing_task: self.processing_task.clone(),
277 shutdown_tx: self.shutdown_tx.clone(),
278 identity_provider: self.identity_provider.clone(),
279 raw_config: self.raw_config.clone(),
280 }
281 }
282
283 pub async fn create_shutdown_channel(&self) -> tokio::sync::oneshot::Receiver<()> {
290 let (tx, rx) = tokio::sync::oneshot::channel();
291 *self.shutdown_tx.write().await = Some(tx);
292 rx
293 }
294
295 pub fn get_id(&self) -> &str {
297 &self.id
298 }
299
300 pub fn get_queries(&self) -> &[String] {
302 &self.queries
303 }
304
305 pub async fn get_status(&self) -> ComponentStatus {
307 self.status_handle.get_status().await
308 }
309
310 pub fn status_handle(&self) -> ComponentStatusHandle {
315 self.status_handle.clone()
316 }
317
318 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
322 self.status_handle.set_status(status, message).await;
323 }
324
325 pub async fn enqueue_query_result(&self, result: QueryResult) -> anyhow::Result<()> {
330 self.priority_queue.enqueue_wait(Arc::new(result)).await;
331 Ok(())
332 }
333
334 pub async fn read_checkpoint(&self, query_id: &str) -> Result<Option<ReactionCheckpoint>> {
343 let store = self.state_store.read().await;
344 match store.as_ref() {
345 Some(s) => {
346 crate::reactions::checkpoint::read_checkpoint(s.as_ref(), &self.id, query_id).await
347 }
348 None => Ok(None),
349 }
350 }
351
352 pub async fn read_all_checkpoints(
357 &self,
358 ) -> Result<std::collections::HashMap<String, ReactionCheckpoint>> {
359 let store = self.state_store.read().await;
360 match store.as_ref() {
361 Some(s) => {
362 crate::reactions::checkpoint::read_checkpoints_batch(
363 s.as_ref(),
364 &self.id,
365 &self.queries,
366 )
367 .await
368 }
369 None => Ok(std::collections::HashMap::new()),
370 }
371 }
372
373 pub async fn write_checkpoint(
378 &self,
379 query_id: &str,
380 checkpoint: &ReactionCheckpoint,
381 ) -> Result<()> {
382 let store = self.state_store.read().await;
383 let store = store.as_ref().ok_or_else(|| {
384 anyhow::anyhow!("No state store configured — cannot write checkpoint")
385 })?;
386 crate::reactions::checkpoint::write_checkpoint(
387 store.as_ref(),
388 &self.id,
389 query_id,
390 checkpoint,
391 )
392 .await
393 }
394
395 pub async fn stop_common(&self) -> Result<()> {
403 info!("Stopping reaction: {}", self.id);
404
405 if let Some(tx) = self.shutdown_tx.write().await.take() {
407 let _ = tx.send(());
408 }
409
410 let mut subscription_tasks = self.subscription_tasks.write().await;
412 for task in subscription_tasks.drain(..) {
413 task.abort();
414 }
415 drop(subscription_tasks);
416
417 let mut processing_task = self.processing_task.write().await;
419 if let Some(mut task) = processing_task.take() {
420 match tokio::time::timeout(std::time::Duration::from_secs(2), &mut task).await {
422 Ok(Ok(())) => {
423 debug!("[{}] Processing task completed gracefully", self.id);
424 }
425 Ok(Err(e)) => {
426 debug!("[{}] Processing task ended: {}", self.id, e);
428 }
429 Err(_) => {
430 warn!(
432 "[{}] Processing task did not respond to shutdown signal within timeout, aborting",
433 self.id
434 );
435 task.abort();
436 }
437 }
438 }
439 drop(processing_task);
440
441 let drained_events = self.priority_queue.drain().await;
443 if !drained_events.is_empty() {
444 info!(
445 "[{}] Drained {} pending events from priority queue",
446 self.id,
447 drained_events.len()
448 );
449 }
450
451 self.set_status(
452 ComponentStatus::Stopped,
453 Some(format!("Reaction '{}' stopped", self.id)),
454 )
455 .await;
456 info!("Reaction '{}' stopped", self.id);
457
458 Ok(())
459 }
460
461 pub async fn deprovision_common(&self) -> Result<()> {
467 info!("Deprovisioning reaction '{}'", self.id);
468 if let Some(store) = self.state_store().await {
469 let count = store.clear_store(&self.id).await.map_err(|e| {
470 anyhow::anyhow!(
471 "Failed to clear state store for reaction '{}': {}",
472 self.id,
473 e
474 )
475 })?;
476 info!(
477 "Cleared {} keys from state store for reaction '{}'",
478 count, self.id
479 );
480 }
481 Ok(())
482 }
483
484 pub async fn set_processing_task(&self, task: tokio::task::JoinHandle<()>) {
486 *self.processing_task.write().await = Some(task);
487 }
488
489 pub async fn run_standard_loop<F, Fut>(
514 &self,
515 mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
516 initial_checkpoints: std::collections::HashMap<String, ReactionCheckpoint>,
517 handler: F,
518 ) -> Result<()>
519 where
520 F: Fn(Arc<crate::channels::QueryResult>) -> Fut + Send + Sync,
521 Fut: std::future::Future<Output = Result<()>> + Send,
522 {
523 let mut checkpoints = initial_checkpoints;
524
525 loop {
526 let event = tokio::select! {
527 biased;
528 _ = &mut shutdown_rx => {
529 break;
530 }
531 event = self.priority_queue.dequeue() => event,
532 };
533
534 let query_id = &event.query_id;
535 let seq = event.sequence;
536
537 if let Some(cp) = checkpoints.get(query_id) {
539 if seq <= cp.sequence {
540 debug!(
541 "[{}] Skipping already-processed event: query={}, seq={} (checkpoint={})",
542 self.id, query_id, seq, cp.sequence
543 );
544 continue;
545 }
546 }
547
548 if let Err(e) = handler(Arc::clone(&event)).await {
550 error!(
551 "[{}] Handler error for query={}, seq={}: {:#}",
552 self.id, query_id, seq, e
553 );
554 continue;
557 }
558
559 let config_hash = checkpoints
561 .get(query_id)
562 .map(|cp| cp.config_hash)
563 .unwrap_or(0);
564 let cp = ReactionCheckpoint {
565 sequence: seq,
566 config_hash,
567 };
568 self.write_checkpoint(query_id, &cp).await?;
569 checkpoints.insert(query_id.clone(), cp);
570 }
571
572 Ok(())
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use std::sync::atomic::{AtomicBool, Ordering};
580 use std::time::Duration;
581 use tokio::sync::mpsc;
582
583 #[tokio::test]
584 async fn test_reaction_base_creation() {
585 let params = ReactionBaseParams::new("test-reaction", vec!["query1".to_string()])
586 .with_priority_queue_capacity(5000);
587
588 let base = ReactionBase::new(params);
589 assert_eq!(base.id, "test-reaction");
590 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
591 }
592
593 #[tokio::test]
594 async fn test_status_transitions() {
595 use crate::context::ReactionRuntimeContext;
596
597 let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
598 let update_tx = graph.update_sender();
599 let graph = Arc::new(RwLock::new(graph));
600 let params = ReactionBaseParams::new("test-reaction", vec![]);
601
602 let base = ReactionBase::new(params);
603
604 let context =
606 ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
607 base.initialize(context).await;
608
609 base.set_status(ComponentStatus::Starting, Some("Starting test".to_string()))
611 .await;
612
613 assert_eq!(base.get_status().await, ComponentStatus::Starting);
614
615 let mut event_rx = graph.read().await.subscribe();
617 base.set_status(ComponentStatus::Running, Some("Running test".to_string()))
619 .await;
620
621 assert_eq!(base.get_status().await, ComponentStatus::Running);
622 }
623
624 #[tokio::test]
625 async fn test_priority_queue_operations() {
626 let params =
627 ReactionBaseParams::new("test-reaction", vec![]).with_priority_queue_capacity(10);
628
629 let base = ReactionBase::new(params);
630
631 let query_result = QueryResult::new(
633 "test-query".to_string(),
634 0,
635 chrono::Utc::now(),
636 vec![],
637 Default::default(),
638 );
639
640 let enqueued = base.priority_queue.enqueue(Arc::new(query_result)).await;
642 assert!(enqueued);
643
644 let drained = base.priority_queue.drain().await;
646 assert_eq!(drained.len(), 1);
647 }
648
649 #[tokio::test]
650 async fn test_event_without_initialization() {
651 let params = ReactionBaseParams::new("test-reaction", vec![]);
653
654 let base = ReactionBase::new(params);
655
656 base.set_status(ComponentStatus::Starting, None).await;
658 }
659
660 #[tokio::test]
665 async fn test_create_shutdown_channel() {
666 let params = ReactionBaseParams::new("test-reaction", vec![]);
667 let base = ReactionBase::new(params);
668
669 assert!(base.shutdown_tx.read().await.is_none());
671
672 let rx = base.create_shutdown_channel().await;
674
675 assert!(base.shutdown_tx.read().await.is_some());
677
678 drop(rx);
680 }
681
682 #[tokio::test]
683 async fn test_shutdown_channel_signal() {
684 let params = ReactionBaseParams::new("test-reaction", vec![]);
685 let base = ReactionBase::new(params);
686
687 let mut rx = base.create_shutdown_channel().await;
688
689 if let Some(tx) = base.shutdown_tx.write().await.take() {
691 tx.send(()).unwrap();
692 }
693
694 let result = rx.try_recv();
696 assert!(result.is_ok());
697 }
698
699 #[tokio::test]
700 async fn test_shutdown_channel_replaced_on_second_create() {
701 let params = ReactionBaseParams::new("test-reaction", vec![]);
702 let base = ReactionBase::new(params);
703
704 let _rx1 = base.create_shutdown_channel().await;
706
707 let mut rx2 = base.create_shutdown_channel().await;
709
710 if let Some(tx) = base.shutdown_tx.write().await.take() {
712 tx.send(()).unwrap();
713 }
714
715 let result = rx2.try_recv();
717 assert!(result.is_ok());
718 }
719
720 #[tokio::test]
721 async fn test_stop_common_sends_shutdown_signal() {
722 let params = ReactionBaseParams::new("test-reaction", vec![]);
723 let base = ReactionBase::new(params);
724
725 let mut rx = base.create_shutdown_channel().await;
726
727 let shutdown_received = Arc::new(AtomicBool::new(false));
729 let shutdown_flag = shutdown_received.clone();
730
731 let task = tokio::spawn(async move {
732 tokio::select! {
733 _ = &mut rx => {
734 shutdown_flag.store(true, Ordering::SeqCst);
735 }
736 }
737 });
738
739 base.set_processing_task(task).await;
740
741 let _ = base.stop_common().await;
743
744 assert!(
746 shutdown_received.load(Ordering::SeqCst),
747 "Processing task should have received shutdown signal"
748 );
749 }
750
751 #[tokio::test]
752 async fn test_graceful_shutdown_timing() {
753 let params = ReactionBaseParams::new("test-reaction", vec![]);
754 let base = ReactionBase::new(params);
755
756 let rx = base.create_shutdown_channel().await;
757
758 let task = tokio::spawn(async move {
760 let mut shutdown_rx = rx;
761 loop {
762 tokio::select! {
763 biased;
764 _ = &mut shutdown_rx => {
765 break;
766 }
767 _ = tokio::time::sleep(Duration::from_secs(10)) => {
768 }
770 }
771 }
772 });
773
774 base.set_processing_task(task).await;
775
776 let start = std::time::Instant::now();
778 let _ = base.stop_common().await;
779 let elapsed = start.elapsed();
780
781 assert!(
783 elapsed < Duration::from_millis(500),
784 "Shutdown took {elapsed:?}, expected < 500ms. Task may not be responding to shutdown signal."
785 );
786 }
787
788 #[tokio::test]
789 async fn test_stop_common_without_shutdown_channel() {
790 let params = ReactionBaseParams::new("test-reaction", vec![]);
792 let base = ReactionBase::new(params);
793
794 let task = tokio::spawn(async {
796 tokio::time::sleep(Duration::from_millis(10)).await;
797 });
798
799 base.set_processing_task(task).await;
800
801 let result = base.stop_common().await;
803 assert!(result.is_ok());
804 }
805
806 #[tokio::test]
811 async fn test_get_id() {
812 let params = ReactionBaseParams::new("my-reaction-42", vec![]);
813 let base = ReactionBase::new(params);
814 assert_eq!(base.get_id(), "my-reaction-42");
815 }
816
817 #[tokio::test]
818 async fn test_get_queries() {
819 let queries = vec!["query-a".to_string(), "query-b".to_string(), "query-c".to_string()];
820 let params = ReactionBaseParams::new("r1", queries.clone());
821 let base = ReactionBase::new(params);
822 assert_eq!(base.get_queries(), &queries[..]);
823 }
824
825 #[tokio::test]
826 async fn test_get_queries_empty() {
827 let params = ReactionBaseParams::new("r1", vec![]);
828 let base = ReactionBase::new(params);
829 assert!(base.get_queries().is_empty());
830 }
831
832 #[tokio::test]
833 async fn test_get_auto_start_default_true() {
834 let params = ReactionBaseParams::new("r1", vec![]);
835 let base = ReactionBase::new(params);
836 assert!(base.get_auto_start());
837 }
838
839 #[tokio::test]
840 async fn test_get_auto_start_override_false() {
841 let params = ReactionBaseParams::new("r1", vec![]).with_auto_start(false);
842 let base = ReactionBase::new(params);
843 assert!(!base.get_auto_start());
844 }
845
846 #[tokio::test]
851 async fn test_context_none_before_initialize() {
852 let params = ReactionBaseParams::new("r1", vec![]);
853 let base = ReactionBase::new(params);
854 assert!(base.context().await.is_none());
855 }
856
857 #[tokio::test]
858 async fn test_context_some_after_initialize() {
859 let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
860 let update_tx = graph.update_sender();
861 let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
862
863 let params = ReactionBaseParams::new("r1", vec![]);
864 let base = ReactionBase::new(params);
865 base.initialize(context).await;
866
867 let ctx = base.context().await;
868 assert!(ctx.is_some());
869 assert_eq!(ctx.unwrap().reaction_id, "r1");
870 }
871
872 #[tokio::test]
873 async fn test_state_store_none_when_not_configured() {
874 let params = ReactionBaseParams::new("r1", vec![]);
875 let base = ReactionBase::new(params);
876 assert!(base.state_store().await.is_none());
877 }
878
879 #[tokio::test]
880 async fn test_state_store_none_after_initialize_without_store() {
881 let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
882 let update_tx = graph.update_sender();
883 let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
884
885 let params = ReactionBaseParams::new("r1", vec![]);
886 let base = ReactionBase::new(params);
887 base.initialize(context).await;
888
889 assert!(base.state_store().await.is_none());
890 }
891
892 #[tokio::test]
893 async fn test_identity_provider_none_by_default() {
894 let params = ReactionBaseParams::new("r1", vec![]);
895 let base = ReactionBase::new(params);
896 assert!(base.identity_provider().await.is_none());
897 }
898
899 #[tokio::test]
904 async fn test_status_handle_returns_handle() {
905 let params = ReactionBaseParams::new("r1", vec![]);
906 let base = ReactionBase::new(params);
907
908 let handle = base.status_handle();
909 assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
911
912 handle.set_status(ComponentStatus::Running, None).await;
914 assert_eq!(base.get_status().await, ComponentStatus::Running);
915 }
916
917 #[tokio::test]
922 async fn test_deprovision_common_noop_without_state_store() {
923 let params = ReactionBaseParams::new("r1", vec![]);
924 let base = ReactionBase::new(params);
925 let result = base.deprovision_common().await;
927 assert!(result.is_ok());
928 }
929
930 #[tokio::test]
935 async fn test_set_processing_task_stores_handle() {
936 let params = ReactionBaseParams::new("r1", vec![]);
937 let base = ReactionBase::new(params);
938
939 assert!(base.processing_task.read().await.is_none());
941
942 let task = tokio::spawn(async {
943 tokio::time::sleep(Duration::from_secs(60)).await;
944 });
945
946 base.set_processing_task(task).await;
947
948 assert!(base.processing_task.read().await.is_some());
950
951 let task = base.processing_task.write().await.take();
953 if let Some(t) = task {
954 t.abort();
955 }
956 }
957
958 #[tokio::test]
959 async fn test_checkpoint_read_write_round_trip() {
960 let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
961 let update_tx = graph.update_sender();
962
963 let params =
964 ReactionBaseParams::new("ckpt-reaction", vec!["q1".to_string(), "q2".to_string()]);
965 let base = ReactionBase::new(params);
966
967 let store: Arc<dyn StateStoreProvider> =
969 Arc::new(crate::state_store::MemoryStateStoreProvider::new());
970 let context = crate::context::ReactionRuntimeContext::new(
971 "test-instance",
972 "ckpt-reaction",
973 Some(store),
974 update_tx,
975 None,
976 );
977 base.initialize(context).await;
978
979 assert!(base.read_checkpoint("q1").await.unwrap().is_none());
981 assert!(base.read_all_checkpoints().await.unwrap().is_empty());
982
983 let cp1 = ReactionCheckpoint {
985 sequence: 10,
986 config_hash: 42,
987 };
988 base.write_checkpoint("q1", &cp1).await.unwrap();
989
990 let read = base.read_checkpoint("q1").await.unwrap().unwrap();
992 assert_eq!(read, cp1);
993
994 assert!(base.read_checkpoint("q2").await.unwrap().is_none());
996
997 let cp2 = ReactionCheckpoint {
999 sequence: 20,
1000 config_hash: 99,
1001 };
1002 base.write_checkpoint("q2", &cp2).await.unwrap();
1003
1004 let all = base.read_all_checkpoints().await.unwrap();
1005 assert_eq!(all.len(), 2);
1006 assert_eq!(all["q1"], cp1);
1007 assert_eq!(all["q2"], cp2);
1008 }
1009
1010 #[tokio::test]
1011 async fn test_checkpoint_no_state_store() {
1012 let params = ReactionBaseParams::new("no-store", vec!["q1".to_string()]);
1014 let base = ReactionBase::new(params);
1015
1016 assert!(base.read_checkpoint("q1").await.unwrap().is_none());
1018
1019 assert!(base.read_all_checkpoints().await.unwrap().is_empty());
1021
1022 let cp = ReactionCheckpoint {
1024 sequence: 1,
1025 config_hash: 0,
1026 };
1027 assert!(base.write_checkpoint("q1", &cp).await.is_err());
1028 }
1029
1030 #[tokio::test]
1031 async fn test_run_standard_loop_dedup_and_checkpoint() {
1032 use std::sync::atomic::{AtomicU64, Ordering};
1033
1034 let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
1035 let update_tx = graph.update_sender();
1036
1037 let params = ReactionBaseParams::new("loop-reaction", vec!["q1".to_string()]);
1038 let base = ReactionBase::new(params);
1039
1040 let store: Arc<dyn StateStoreProvider> =
1041 Arc::new(crate::state_store::MemoryStateStoreProvider::new());
1042 let context = crate::context::ReactionRuntimeContext::new(
1043 "test-instance",
1044 "loop-reaction",
1045 Some(store),
1046 update_tx,
1047 None,
1048 );
1049 base.initialize(context).await;
1050
1051 let initial_checkpoints = {
1053 let mut m = std::collections::HashMap::new();
1054 m.insert(
1055 "q1".to_string(),
1056 ReactionCheckpoint {
1057 sequence: 5,
1058 config_hash: 42,
1059 },
1060 );
1061 m
1062 };
1063
1064 for seq in [3u64, 5, 6, 7] {
1066 let result = crate::channels::QueryResult {
1067 query_id: "q1".to_string(),
1068 sequence: seq,
1069 timestamp: chrono::Utc::now(),
1070 results: vec![],
1071 metadata: Default::default(),
1072 profiling: None,
1073 };
1074 base.enqueue_query_result(result).await.unwrap();
1075 }
1076
1077 let processed = Arc::new(tokio::sync::Mutex::new(Vec::<u64>::new()));
1079 let processed_clone = processed.clone();
1080 let handler_count = Arc::new(AtomicU64::new(0));
1081 let handler_count_clone = handler_count.clone();
1082
1083 let shutdown_rx = base.create_shutdown_channel().await;
1084 let base_clone = base.clone_shared();
1085
1086 let loop_handle = tokio::spawn(async move {
1087 base_clone
1088 .run_standard_loop(shutdown_rx, initial_checkpoints, |event| {
1089 let processed = processed_clone.clone();
1090 let count = handler_count_clone.clone();
1091 async move {
1092 processed.lock().await.push(event.sequence);
1093 count.fetch_add(1, Ordering::SeqCst);
1094 Ok(())
1095 }
1096 })
1097 .await
1098 .unwrap();
1099 });
1100
1101 for _ in 0..50 {
1103 if handler_count.load(Ordering::SeqCst) >= 2 {
1104 break;
1105 }
1106 tokio::time::sleep(Duration::from_millis(50)).await;
1107 }
1108
1109 let _ = base.stop_common().await;
1111 let _ = tokio::time::timeout(Duration::from_secs(2), loop_handle).await;
1112
1113 let processed = processed.lock().await;
1115 assert_eq!(*processed, vec![6, 7]);
1116
1117 let cp = base.read_checkpoint("q1").await.unwrap().unwrap();
1119 assert_eq!(cp.sequence, 7);
1120 assert_eq!(cp.config_hash, 42);
1121 }
1122}