use std::net::SocketAddr;
use std::sync::Arc;
use d_engine_core::client::ClientResponse;
use d_engine_proto::client::MembershipSnapshot;
use d_engine_proto::client::WatchResponse;
use d_engine_proto::client::raft_client_service_server::RaftClientServiceServer;
use d_engine_proto::common::NodeRole;
use d_engine_proto::common::NodeStatus;
use d_engine_proto::server::cluster::ClusterMembership;
use d_engine_proto::server::cluster::NodeMeta;
use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementServiceServer;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tonic::codec::CompressionEncoding;
use tonic::transport::Channel;
use tonic_health::server::health_reporter;
use tracing::debug;
use tracing::info;
use crate::mock_rpc::MockRpcService;
pub struct MockNode;
impl MockNode {
pub async fn mock_listener(
mut mock_service: MockRpcService,
rx: oneshot::Receiver<()>,
is_ready: bool,
) -> std::result::Result<(u16, SocketAddr), tonic::Status> {
let (health_reporter, health_service) = health_reporter();
if is_ready {
health_reporter.set_serving::<RaftClientServiceServer<MockNode>>().await;
health_reporter.set_serving::<ClusterManagementServiceServer<MockNode>>().await;
info!("set service is serving");
} else {
health_reporter.set_not_serving::<RaftClientServiceServer<MockNode>>().await;
health_reporter
.set_not_serving::<ClusterManagementServiceServer<MockNode>>()
.await;
info!("set service is not serving");
}
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener
.local_addr()
.map_err(|e| tonic::Status::internal(format!("Failed to bind: {e}")))?;
let port = addr.port();
debug!("starting mock rpc service:port={port}",);
mock_service.set_port(port);
let mock_service = Arc::new(mock_service);
let _r = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(health_service)
.add_service(
RaftClientServiceServer::from_arc(mock_service.clone())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
)
.add_service(
ClusterManagementServiceServer::from_arc(mock_service.clone())
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
)
.serve_with_incoming_shutdown(
tokio_stream::wrappers::TcpListenerStream::new(listener),
async {
rx.await.ok();
},
)
.await
.unwrap();
});
Ok((port, addr)) }
pub(crate) async fn mock_channel_with_port(port: u16) -> Channel {
Channel::from_shared(format!("http://127.0.0.1:{port}"))
.expect("valid address")
.connect()
.await
.expect("connection failed")
}
#[allow(clippy::type_complexity)]
pub(crate) async fn simulate_mock_service_with_cluster_conf_reps(
rx: oneshot::Receiver<()>,
response_builder: Option<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = response_builder.unwrap_or_else(|| {
Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}",),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
})
});
let mock_service = MockRpcService::default().with_metadata_response(builder);
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
#[allow(clippy::type_complexity)]
pub(crate) async fn simulate_client_read_mock_server(
rx: oneshot::Receiver<()>,
metadata_response_builder: Option<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response: ClientResponse,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = metadata_response_builder.unwrap_or_else(|| {
Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}",),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
})
});
let mock_service = MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_client_read_response: Some(Ok(response)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
#[allow(clippy::type_complexity)]
pub(crate) async fn simulate_client_write_mock_server(
rx: oneshot::Receiver<()>,
metadata_response_builder: Option<
Box<dyn Fn(u16) -> std::result::Result<ClusterMembership, tonic::Status> + Send + Sync>,
>,
response: ClientResponse,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = metadata_response_builder.unwrap_or_else(|| {
Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}",),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
})
});
let mock_service = MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_client_propose_response: Some(Ok(response)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
#[allow(clippy::type_complexity)]
pub(crate) async fn simulate_cas_mock_server(
rx: oneshot::Receiver<()>,
cas_should_succeed: bool,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let response = if cas_should_succeed {
ClientResponse::write_success()
} else {
ClientResponse::cas_failure()
};
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_client_propose_response: Some(Ok(response)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
pub(crate) async fn simulate_watch_mock_server(
rx: oneshot::Receiver<()>,
events: Vec<WatchResponse>,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = crate::mock_rpc::MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_watch_events: Some(Ok(events)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
pub(crate) async fn simulate_watch_error_mock_server(
rx: oneshot::Receiver<()>,
status: tonic::Status,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = crate::mock_rpc::MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_watch_events: Some(Err(status)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
pub(crate) async fn simulate_watch_membership_mock_server(
rx: oneshot::Receiver<()>,
snapshots: Vec<MembershipSnapshot>,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = crate::mock_rpc::MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_watch_membership_events: Some(Ok(snapshots)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
pub(crate) async fn simulate_watch_membership_error_mock_server(
rx: oneshot::Receiver<()>,
status: tonic::Status,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = crate::mock_rpc::MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_watch_membership_events: Some(Err(status)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
pub(crate) async fn simulate_scan_mock_server(
rx: oneshot::Receiver<()>,
response: d_engine_proto::client::ScanResponse,
) -> std::result::Result<(Channel, u16), tonic::Status> {
let builder = Box::new(|port: u16| {
Ok(ClusterMembership {
version: 1,
nodes: vec![NodeMeta {
id: 1,
role: NodeRole::Leader as i32,
address: format!("127.0.0.1:{port}"),
status: NodeStatus::Active.into(),
}],
current_leader_id: Some(1),
})
});
let mock_service = crate::mock_rpc::MockRpcService {
expected_metadata_response: Some(Arc::new(builder)),
expected_client_scan_response: Some(Ok(response)),
..Default::default()
};
let (port, _addr) = Self::mock_listener(mock_service, rx, true).await?;
let channel = Self::mock_channel_with_port(port).await;
Ok((channel, port))
}
}