use crate::wire::{VShardEnvelope, VShardMessageType};
pub enum HandleResult {
Response(VShardEnvelope),
NoResponse,
Error(String),
}
pub trait VShardHandler: Send + Sync + 'static {
fn handle_vshard_envelope(
&self,
envelope: VShardEnvelope,
) -> impl std::future::Future<Output = HandleResult> + Send;
}
pub fn dispatch_by_type(envelope: &VShardEnvelope) -> DispatchTarget {
match envelope.msg_type {
VShardMessageType::GraphAlgoSuperstep => DispatchTarget::GraphBsp,
VShardMessageType::GraphAlgoContributions => DispatchTarget::GraphBsp,
VShardMessageType::GraphAlgoSuperstepAck => DispatchTarget::GraphBsp,
VShardMessageType::GraphAlgoComplete => DispatchTarget::GraphBsp,
VShardMessageType::TsScatterRequest => DispatchTarget::TimeseriesScan,
VShardMessageType::TsScatterResponse => DispatchTarget::TimeseriesCoordinator,
VShardMessageType::TsRetentionCommand => DispatchTarget::TimeseriesRetention,
VShardMessageType::TsRetentionAck => DispatchTarget::TimeseriesCoordinator,
VShardMessageType::TsArchiveCommand => DispatchTarget::TimeseriesArchive,
VShardMessageType::TsArchiveAck => DispatchTarget::TimeseriesCoordinator,
VShardMessageType::SegmentChunk
| VShardMessageType::SegmentComplete
| VShardMessageType::WalTail
| VShardMessageType::RoutingUpdate
| VShardMessageType::RoutingAck => DispatchTarget::Migration,
VShardMessageType::GhostCreate
| VShardMessageType::GhostDelete
| VShardMessageType::GhostVerifyRequest
| VShardMessageType::GhostVerifyResponse => DispatchTarget::Ghost,
VShardMessageType::MigrationBaseCopy => DispatchTarget::Migration,
VShardMessageType::CrossShardForward => DispatchTarget::Forward,
VShardMessageType::GsiForward => DispatchTarget::Forward,
VShardMessageType::EdgeValidation => DispatchTarget::GraphValidation,
VShardMessageType::VectorScatterRequest => DispatchTarget::VectorSearch,
VShardMessageType::VectorScatterResponse => DispatchTarget::VectorCoordinator,
VShardMessageType::SpatialScatterRequest => DispatchTarget::SpatialSearch,
VShardMessageType::SpatialScatterResponse => DispatchTarget::SpatialCoordinator,
VShardMessageType::CrossShardEvent => DispatchTarget::EventPlane,
VShardMessageType::CrossShardEventAck => DispatchTarget::EventPlane,
VShardMessageType::NotifyBroadcast => DispatchTarget::EventPlane,
VShardMessageType::NotifyBroadcastAck => DispatchTarget::EventPlane,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispatchTarget {
GraphBsp,
GraphValidation,
TimeseriesScan,
TimeseriesCoordinator,
TimeseriesRetention,
TimeseriesArchive,
VectorSearch,
VectorCoordinator,
Migration,
Ghost,
Forward,
SpatialSearch,
SpatialCoordinator,
EventPlane,
}
pub fn build_ts_scatter_response(
source_node: u64,
target_node: u64,
vshard_id: u16,
partials_json: &[u8],
) -> VShardEnvelope {
VShardEnvelope::new(
VShardMessageType::TsScatterResponse,
source_node,
target_node,
vshard_id,
partials_json.to_vec(),
)
}
pub fn build_ts_retention_ack(
source_node: u64,
target_node: u64,
vshard_id: u16,
result_json: &[u8],
) -> VShardEnvelope {
VShardEnvelope::new(
VShardMessageType::TsRetentionAck,
source_node,
target_node,
vshard_id,
result_json.to_vec(),
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dispatch_graph_bsp() {
let env = VShardEnvelope::new(VShardMessageType::GraphAlgoSuperstep, 1, 2, 42, vec![]);
assert_eq!(dispatch_by_type(&env), DispatchTarget::GraphBsp);
}
#[test]
fn dispatch_ts_scatter() {
let env = VShardEnvelope::new(VShardMessageType::TsScatterRequest, 1, 2, 42, vec![]);
assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesScan);
}
#[test]
fn dispatch_ts_retention() {
let env = VShardEnvelope::new(VShardMessageType::TsRetentionCommand, 1, 2, 42, vec![]);
assert_eq!(dispatch_by_type(&env), DispatchTarget::TimeseriesRetention);
}
#[test]
fn dispatch_migration() {
let env = VShardEnvelope::new(VShardMessageType::SegmentChunk, 1, 2, 42, vec![]);
assert_eq!(dispatch_by_type(&env), DispatchTarget::Migration);
}
#[test]
fn all_message_types_dispatched() {
let all_types = [
VShardMessageType::SegmentChunk,
VShardMessageType::SegmentComplete,
VShardMessageType::WalTail,
VShardMessageType::RoutingUpdate,
VShardMessageType::RoutingAck,
VShardMessageType::GhostCreate,
VShardMessageType::GhostDelete,
VShardMessageType::GhostVerifyRequest,
VShardMessageType::GhostVerifyResponse,
VShardMessageType::MigrationBaseCopy,
VShardMessageType::CrossShardForward,
VShardMessageType::GsiForward,
VShardMessageType::EdgeValidation,
VShardMessageType::GraphAlgoSuperstep,
VShardMessageType::GraphAlgoContributions,
VShardMessageType::GraphAlgoSuperstepAck,
VShardMessageType::GraphAlgoComplete,
VShardMessageType::TsScatterRequest,
VShardMessageType::TsScatterResponse,
VShardMessageType::TsRetentionCommand,
VShardMessageType::TsRetentionAck,
VShardMessageType::TsArchiveCommand,
VShardMessageType::TsArchiveAck,
VShardMessageType::VectorScatterRequest,
VShardMessageType::VectorScatterResponse,
VShardMessageType::SpatialScatterRequest,
VShardMessageType::SpatialScatterResponse,
VShardMessageType::CrossShardEvent,
VShardMessageType::CrossShardEventAck,
VShardMessageType::NotifyBroadcast,
VShardMessageType::NotifyBroadcastAck,
];
for msg_type in all_types {
let env = VShardEnvelope::new(msg_type, 1, 2, 0, vec![]);
let _ = dispatch_by_type(&env); }
}
}