use ankurah_proto::{self as proto, CollectionId};
use anyhow::anyhow;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use tracing::{debug, warn};
use crate::error::{RequestError, RetrievalError};
use crate::node::ContextData;
use crate::util::safeset::SafeSet;
#[async_trait::async_trait]
pub trait RemoteQuerySubscriber: Clone + Send + Sync + 'static {
async fn subscription_established(&self, version: u32);
fn set_last_error(&self, error: RetrievalError);
}
#[derive(Debug, Clone)]
pub enum Status {
PendingRemote,
Requested(proto::EntityId, u32), Established(proto::EntityId, u32), PendingUpdate(proto::EntityId, u32), Failed,
}
#[derive(Debug)]
pub struct Content<CD: ContextData> {
pub query_id: proto::QueryId,
pub collection_id: CollectionId,
pub selection: ankql::ast::Selection,
pub context_data: CD,
pub version: u32,
}
pub struct RemoteQueryState<CD: ContextData, Q: RemoteQuerySubscriber> {
pub content: Arc<Content<CD>>,
pub status: Status,
pub livequery: Q,
}
struct SubscriptionRelayInner<CD: ContextData, Q: RemoteQuerySubscriber> {
subscriptions: std::sync::Mutex<HashMap<proto::QueryId, RemoteQueryState<CD, Q>>>,
connected_peers: SafeSet<proto::EntityId>,
node: OnceLock<Arc<dyn TNode<CD>>>,
_shutdown_tx: tokio::sync::mpsc::Sender<()>,
}
#[derive(Clone)]
pub struct SubscriptionRelay<CD: ContextData, Q: RemoteQuerySubscriber> {
inner: Arc<SubscriptionRelayInner<CD, Q>>,
}
impl<CD: ContextData, Q: RemoteQuerySubscriber> Default for SubscriptionRelay<CD, Q> {
fn default() -> Self { Self::new() }
}
impl<CD: ContextData, Q: RemoteQuerySubscriber> SubscriptionRelay<CD, Q> {
pub fn new() -> Self {
let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1);
let relay = Self {
inner: Arc::new(SubscriptionRelayInner {
subscriptions: std::sync::Mutex::new(HashMap::new()),
connected_peers: SafeSet::new(),
node: OnceLock::new(),
_shutdown_tx: shutdown_tx,
}),
};
relay.start_retry_task(shutdown_rx);
relay
}
pub fn set_node(&self, node: Arc<dyn TNode<CD>>) -> Result<(), ()> { self.inner.node.set(node).map_err(|_| ()) }
pub fn subscribe_query(
&self,
query_id: proto::QueryId,
collection_id: CollectionId,
selection: ankql::ast::Selection,
context_data: CD,
version: u32,
livequery: Q,
) {
debug!("SubscriptionRelay.subscribe_predicate() - New predicate {} needs remote registration", query_id);
{
self.inner.subscriptions.lock().expect("poisoned lock").insert(
query_id,
RemoteQueryState {
content: Arc::new(Content { collection_id, selection, context_data, query_id, version }),
status: Status::PendingRemote,
livequery,
},
);
}
if !self.inner.connected_peers.is_empty() {
self.setup_remote_subscriptions()
}
}
pub fn update_query(&self, query_id: proto::QueryId, selection: ankql::ast::Selection, version: u32) -> Result<(), anyhow::Error> {
debug!("SubscriptionRelay.update_query() - New query {} needs remote registration", query_id);
let update = {
let mut subscriptions = self.inner.subscriptions.lock().expect("poisoned lock");
match subscriptions.get_mut(&query_id) {
Some(state) => {
let old_content = &state.content;
state.content = Arc::new(Content {
collection_id: old_content.collection_id.clone(),
selection: selection.clone(),
context_data: old_content.context_data.clone(),
query_id: old_content.query_id,
version,
});
match state.status {
Status::Established(peer_id, _old_version) => {
state.status = Status::Requested(peer_id, version);
Some((peer_id, state.content.collection_id.clone(), state.content.context_data.clone()))
}
_ => {
state.status = Status::PendingRemote;
None
}
}
}
None => return Err(anyhow!("Predicate {} not found", query_id)),
}
};
match update {
Some((peer_id, collection_id, context_data)) => {
self.update_query_on_peer(peer_id, query_id, collection_id, selection, version, context_data);
}
None => {
self.setup_remote_subscriptions();
}
};
Ok(())
}
fn update_query_on_peer(
&self,
peer_id: proto::EntityId,
query_id: proto::QueryId,
collection_id: CollectionId,
selection: ankql::ast::Selection,
version: u32,
context_data: CD,
) {
let me = self.clone();
crate::task::spawn(async move {
if let Some(node) = me.inner.node.get() {
let livequery = {
me.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner()).get(&query_id).map(|state| state.livequery.clone())
};
match node.remote_subscribe(peer_id, query_id, collection_id, selection, &context_data, version).await {
Ok(()) => {
if let Some(lq) = livequery {
lq.subscription_established(version).await;
}
let mut subscriptions = me.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(info) = subscriptions.get_mut(&query_id) {
info.status = Status::Established(peer_id, version);
}
debug!("Successfully updated predicate {} on peer {} subscription", query_id, peer_id);
}
Err(e) => {
me.handle_error(query_id, peer_id, e, livequery).await;
}
}
}
});
}
pub fn unsubscribe_predicate(&self, query_id: proto::QueryId) {
debug!("Unregistering predicate {}", query_id);
{
let mut subscriptions = self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(info) = subscriptions.remove(&query_id) {
if let Status::Established(peer_id, _version) = &info.status {
let node = self.inner.node.get();
if let Some(node) = node {
let node = node.clone();
let peer_id = *peer_id;
crate::task::spawn(async move {
if let Err(e) = node.peer_unsubscribe(peer_id, query_id).await {
warn!("Failed to send unsubscribe message for {}: {}", query_id, e);
} else {
debug!("Successfully sent unsubscribe message for {}", query_id);
}
});
}
}
}
}
}
pub fn notify_peer_disconnected(&self, peer_id: proto::EntityId) {
debug!("Peer {} disconnected, orphaning predicate registrations", peer_id);
self.inner.connected_peers.remove(&peer_id);
for info in self.inner.subscriptions.lock().expect("poisoned lock").values_mut() {
if let Status::Established(established_peer_id, _) | Status::Requested(established_peer_id, _) = &info.status {
if *established_peer_id == peer_id {
info.status = Status::PendingRemote;
warn!("Predicate {} orphaned due to peer {} disconnect", info.content.query_id, peer_id);
}
}
}
self.setup_remote_subscriptions();
}
pub fn notify_peer_connected(&self, peer_id: proto::EntityId) {
debug!("SubscriptionRelay.notify_peer_connected() - Peer {} connected, registering predicates on peer subscription", peer_id);
self.inner.connected_peers.insert(peer_id);
self.setup_remote_subscriptions();
}
pub fn get_status(&self, query_id: proto::QueryId) -> Option<Status> {
let subscriptions = self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
subscriptions.get(&query_id).map(|info| info.status.clone())
}
pub fn get_contexts_for_peer(&self, peer_id: &proto::EntityId) -> std::collections::HashSet<CD> {
let subscriptions = self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
let mut contexts = std::collections::HashSet::new();
for (_, state) in subscriptions.iter() {
match &state.status {
Status::Established(established_peer, _) | Status::Requested(established_peer, _) => {
if established_peer == peer_id {
contexts.insert(state.content.context_data.clone());
}
}
_ => {}
}
}
contexts
}
fn setup_remote_subscriptions(&self) {
let node = match self.inner.node.get() {
Some(node) => node,
None => {
warn!("No node configured for remote subscription setup");
return;
}
};
let connected_peers = self.inner.connected_peers.to_vec();
if connected_peers.is_empty() {
warn!("No durable peers available for remote subscription setup");
return;
}
let target_peer = connected_peers[0];
let pending: Vec<_> = {
self.inner
.subscriptions
.lock()
.expect("poisoned lock")
.values_mut()
.filter_map(|info| {
if let Status::PendingRemote = info.status {
info.status = Status::Requested(target_peer, info.content.version);
Some(info.content.clone())
} else {
None
}
})
.collect()
};
if pending.is_empty() {
return;
}
debug!("Registering {} predicates on {} peer subscriptions", pending.len(), self.inner.connected_peers.len());
for content in pending {
crate::task::spawn(self.clone().attempt_subscribe(node.clone(), target_peer, content));
}
}
async fn attempt_subscribe(self, node: Arc<dyn TNode<CD>>, target_peer: proto::EntityId, content: Arc<Content<CD>>) {
let query_id = content.query_id;
let predicate = content.selection.clone();
let context_data = content.context_data.clone();
let version = content.version;
let livequery =
{ self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner()).get(&query_id).map(|state| state.livequery.clone()) };
match node.remote_subscribe(target_peer, query_id, content.collection_id.clone(), predicate, &context_data, version).await {
Ok(()) => {
if let Some(lq) = livequery {
lq.subscription_established(version).await;
}
let mut subscriptions = self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(info) = subscriptions.get_mut(&query_id) {
info.status = Status::Established(target_peer, version);
}
debug!("Successfully registered predicate {} on peer {} subscription", query_id, target_peer);
}
Err(e) => {
self.handle_error(query_id, target_peer, e, livequery).await;
}
}
}
fn start_retry_task(&self, mut shutdown_rx: tokio::sync::mpsc::Receiver<()>) {
let me = self.clone();
crate::task::spawn(async move {
loop {
let delay = futures_timer::Delay::new(std::time::Duration::from_secs(5));
tokio::select! {
_ = delay => {
me.setup_remote_subscriptions();
}
_ = shutdown_rx.recv() => {
debug!("Retry task shutting down - SubscriptionRelay dropped");
break;
}
}
}
});
}
async fn handle_error(&self, query_id: proto::QueryId, target_peer: proto::EntityId, error: RetrievalError, livequery: Option<Q>) {
let error_msg = error.to_string();
let is_retryable = match &error {
RetrievalError::RequestError(req_err) => match req_err {
RequestError::PeerNotConnected => true,
RequestError::ConnectionLost => true,
RequestError::SendError(_) => true,
RequestError::InternalChannelClosed => true,
RequestError::ServerError(_) => false,
RequestError::UnexpectedResponse(_) => false,
RequestError::AccessDenied(_) => false,
},
_ => false,
};
let mut subscriptions = self.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(info) = subscriptions.get_mut(&query_id) {
if is_retryable {
info.status = Status::PendingRemote;
warn!("Retryable failure for predicate {} with peer {}: {} - will retry", query_id, target_peer, error_msg);
} else {
info.status = Status::Failed;
tracing::error!("Permanent failure for predicate {} with peer {}: {} - no retry", query_id, target_peer, error_msg);
if let Some(lq) = livequery {
lq.set_last_error(error);
}
}
}
}
}
#[async_trait]
pub trait TNode<CD: ContextData>: Send + Sync {
async fn remote_subscribe(
&self,
peer_id: proto::EntityId,
query_id: proto::QueryId,
collection_id: CollectionId,
selection: ankql::ast::Selection,
context_data: &CD,
version: u32,
) -> Result<(), RetrievalError>;
async fn peer_unsubscribe(&self, peer_id: proto::EntityId, query_id: proto::QueryId) -> Result<(), anyhow::Error>;
}
#[async_trait]
impl<SE, PA> TNode<PA::ContextData> for crate::node::WeakNode<SE, PA>
where
SE: crate::storage::StorageEngine + Send + Sync + 'static,
PA: crate::policy::PolicyAgent + Send + Sync + 'static,
{
async fn remote_subscribe(
&self,
peer_id: proto::EntityId,
query_id: proto::QueryId,
collection_id: CollectionId,
selection: ankql::ast::Selection,
context_data: &PA::ContextData,
version: u32,
) -> Result<(), RetrievalError> {
let node = self.upgrade().ok_or_else(|| RetrievalError::Other("Node has been dropped".to_string()))?;
let known_matches: Vec<ankurah_proto::KnownEntity> = node
.fetch_entities_from_local(&collection_id, &selection)
.await?
.into_iter()
.map(|entity| ankurah_proto::KnownEntity { entity_id: entity.id(), head: entity.head() })
.collect();
let deltas = match node
.request(
peer_id,
context_data,
ankurah_proto::NodeRequestBody::SubscribeQuery {
query_id,
collection: collection_id.clone(),
selection: selection.clone(),
version,
known_matches,
},
)
.await
.map_err(|e| RetrievalError::RequestError(e))?
{
ankurah_proto::NodeResponseBody::QuerySubscribed { query_id: _response_query_id, deltas } => deltas,
ankurah_proto::NodeResponseBody::Error(e) => return Err(RetrievalError::RequestError(RequestError::ServerError(e))),
other => return Err(RetrievalError::RequestError(RequestError::UnexpectedResponse(other))),
};
tracing::debug!(
"Node.remote_subscribe: query_id: {}, collection_id: {}, received deltas: {}",
query_id,
collection_id,
deltas.len()
);
let retriever = crate::retrieval::EphemeralNodeRetriever::new(collection_id, &node, context_data);
let apply_result = crate::node_applier::NodeApplier::apply_deltas(&node, &peer_id, deltas, &retriever).await;
let event_store_result = retriever.store_used_events().await;
apply_result?; event_store_result?;
Ok(())
}
async fn peer_unsubscribe(&self, peer_id: proto::EntityId, query_id: proto::QueryId) -> Result<(), anyhow::Error> {
let node = self.upgrade().ok_or_else(|| anyhow!("Node has been dropped"))?;
node.request_remote_unsubscribe(query_id, vec![peer_id]).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use ankql::ast::Predicate;
use ankurah_proto::EntityId;
use std::sync::{Arc, Mutex};
impl ContextData for CollectionId {}
#[derive(Debug)]
struct MockMessageSender<CD: ContextData> {
next_error: Arc<Mutex<Option<RequestError>>>,
sent_requests: Arc<Mutex<Vec<(EntityId, proto::QueryId, CollectionId, ankql::ast::Selection)>>>,
should_fail: Arc<Mutex<bool>>,
failure_message: Arc<Mutex<String>>,
_phantom: std::marker::PhantomData<CD>,
}
impl<CD: ContextData> MockMessageSender<CD> {
fn new() -> Self {
Self {
sent_requests: Arc::new(Mutex::new(Vec::new())),
next_error: Arc::new(Mutex::new(None)),
should_fail: Arc::new(Mutex::new(false)),
failure_message: Arc::new(Mutex::new(String::new())),
_phantom: std::marker::PhantomData,
}
}
fn set_fail_next(&self, error: RequestError) { *self.next_error.lock().unwrap() = Some(error); }
fn get_sent_requests(&self) -> Vec<(EntityId, proto::QueryId, CollectionId, ankql::ast::Selection)> {
self.sent_requests.lock().unwrap().clone()
}
fn clear_sent_requests(&self) { self.sent_requests.lock().unwrap().clear(); }
}
#[async_trait]
impl<CD: ContextData> TNode<CD> for MockMessageSender<CD> {
async fn remote_subscribe(
&self,
peer_id: EntityId,
query_id: proto::QueryId,
collection_id: CollectionId,
selection: ankql::ast::Selection,
_context_data: &CD,
_version: u32,
) -> Result<(), RetrievalError> {
self.sent_requests.lock().unwrap().push((peer_id, query_id, collection_id.clone(), selection.clone()));
if let Some(error) = self.next_error.lock().unwrap().take() {
Err(RetrievalError::RequestError(error))
} else {
Ok(())
}
}
async fn peer_unsubscribe(&self, peer_id: EntityId, query_id: proto::QueryId) -> Result<(), anyhow::Error> {
self.sent_requests.lock().unwrap().push((
peer_id,
query_id,
CollectionId::from("unsubscribe"),
ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None },
));
if let Some(error) = self.next_error.lock().unwrap().take() {
Err(anyhow!(error.to_string()))
} else {
Ok(())
}
}
}
#[derive(Clone)]
struct MockLiveQuery;
#[async_trait::async_trait]
impl RemoteQuerySubscriber for MockLiveQuery {
async fn subscription_established(&self, _version: u32) {
}
fn set_last_error(&self, _error: RetrievalError) {
}
}
fn create_test_selection() -> ankql::ast::Selection {
ankql::ast::Selection { predicate: ankql::ast::Predicate::True, order_by: None, limit: None }
}
fn create_test_collection_id() -> CollectionId { CollectionId::from("test_collection") }
#[tokio::test]
async fn test_new_subscription_setup() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.notify_peer_connected(peer_id);
relay.subscribe_query(query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
assert!(matches!(relay.get_status(query_id), Some(Status::Requested(_, _))));
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 1);
assert_eq!(sent_requests[0].0, peer_id);
assert_eq!(sent_requests[0].1, query_id);
assert_eq!(sent_requests[0].2, collection_id);
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
}
#[tokio::test]
async fn test_peer_disconnection_orphans_subscriptions() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.notify_peer_connected(peer_id);
relay.subscribe_query(query_id, collection_id.clone(), predicate, collection_id.clone(), 0, MockLiveQuery);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
relay.notify_peer_disconnected(peer_id);
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
}
#[tokio::test]
async fn test_peer_connection_triggers_setup() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.subscribe_query(query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
mock_sender.clear_sent_requests();
relay.notify_peer_connected(peer_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 1);
assert_eq!(sent_requests[0].0, peer_id);
assert_eq!(sent_requests[0].1, query_id);
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
}
#[tokio::test]
async fn test_failed_subscription_retry() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.notify_peer_connected(peer_id);
relay.subscribe_query(query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
relay.notify_peer_disconnected(peer_id);
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
mock_sender.clear_sent_requests();
mock_sender.set_fail_next(RequestError::ServerError("Invalid predicate".to_string()));
relay.notify_peer_connected(peer_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 1);
assert!(matches!(relay.get_status(query_id), Some(Status::Failed)));
}
#[tokio::test]
async fn test_retryable_vs_non_retryable_failures() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let retryable_query_id = proto::QueryId::new();
let non_retryable_query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.subscribe_query(retryable_query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
relay.subscribe_query(non_retryable_query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
{
let mut subscriptions = relay.inner.subscriptions.lock().unwrap_or_else(|e| e.into_inner());
if let Some(info) = subscriptions.get_mut(&retryable_query_id) {
info.status = Status::PendingRemote; }
if let Some(info) = subscriptions.get_mut(&non_retryable_query_id) {
info.status = Status::Failed; }
}
relay.notify_peer_connected(peer_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 1);
assert_eq!(sent_requests[0].1, retryable_query_id);
assert!(
matches!(relay.get_status(retryable_query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id)
);
assert!(matches!(relay.get_status(non_retryable_query_id), Some(Status::Failed)));
}
#[tokio::test]
async fn test_subscription_removal() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.notify_peer_connected(peer_id);
relay.subscribe_query(query_id, collection_id.clone(), predicate, collection_id.clone(), 0, MockLiveQuery);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
mock_sender.clear_sent_requests();
relay.unsubscribe_predicate(query_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 1);
assert_eq!(sent_requests[0].0, peer_id);
assert_eq!(sent_requests[0].1, query_id);
assert!(matches!(relay.get_status(query_id), None));
}
#[tokio::test]
async fn test_edge_cases() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
let peer_id = EntityId::new();
relay.subscribe_query(query_id, collection_id.clone(), predicate.clone(), collection_id.clone(), 0, MockLiveQuery);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
assert_eq!(mock_sender.get_sent_requests().len(), 0);
relay.notify_peer_connected(peer_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
assert!(matches!(relay.get_status(query_id), Some(Status::Established(established_peer_id, _)) if established_peer_id == peer_id));
assert_eq!(mock_sender.get_sent_requests().len(), 1);
}
#[tokio::test]
async fn test_notify_unsubscribe_with_no_established_subscription() {
let relay = SubscriptionRelay::new();
let mock_sender = Arc::new(MockMessageSender::<CollectionId>::new());
relay.set_node(mock_sender.clone()).expect("Failed to set message sender");
let query_id = proto::QueryId::new();
let collection_id = create_test_collection_id();
let predicate = create_test_selection();
relay.subscribe_query(query_id, collection_id.clone(), predicate, collection_id.clone(), 0, MockLiveQuery);
assert!(matches!(relay.get_status(query_id), Some(Status::PendingRemote)));
relay.unsubscribe_predicate(query_id);
futures_timer::Delay::new(std::time::Duration::from_millis(10)).await;
let sent_requests = mock_sender.get_sent_requests();
assert_eq!(sent_requests.len(), 0);
assert!(matches!(relay.get_status(query_id), None));
}
}