Skip to main content

phago_distributed/rpc/
protocol.rs

1//! tarpc service trait definitions.
2//!
3//! This module defines the RPC service interfaces for distributed
4//! colony coordination using tarpc's procedural macro system.
5
6use crate::types::{
7    GhostNode, LocalQueryRequest, LocalQueryResult, PhaseResult, ShardHealth, ShardId, ShardInfo,
8    TickPhase,
9};
10use phago_core::types::{Document, DocumentId, NodeData, NodeId};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// Result type for RPC operations that needs to be serializable.
15pub type RpcResult<T> = Result<T, RpcError>;
16
17/// Serializable error type for RPC calls.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum RpcError {
20    /// Shard not found.
21    ShardNotFound(u32),
22    /// Coordinator unavailable.
23    CoordinatorUnavailable,
24    /// RPC call failed.
25    RpcFailed(String),
26    /// Phase timeout.
27    PhaseTimeout(String),
28    /// Document routing failed.
29    RoutingFailed,
30    /// Edge resolution failed.
31    EdgeResolutionFailed,
32    /// Ghost node not found.
33    GhostNodeNotFound,
34    /// Barrier synchronization failed.
35    BarrierFailed,
36    /// Internal error.
37    Internal(String),
38}
39
40impl std::fmt::Display for RpcError {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            RpcError::ShardNotFound(id) => write!(f, "Shard {} not found", id),
44            RpcError::CoordinatorUnavailable => write!(f, "Coordinator unavailable"),
45            RpcError::RpcFailed(msg) => write!(f, "RPC failed: {}", msg),
46            RpcError::PhaseTimeout(phase) => write!(f, "Phase timeout: {}", phase),
47            RpcError::RoutingFailed => write!(f, "Document routing failed"),
48            RpcError::EdgeResolutionFailed => write!(f, "Edge resolution failed"),
49            RpcError::GhostNodeNotFound => write!(f, "Ghost node not found"),
50            RpcError::BarrierFailed => write!(f, "Barrier synchronization failed"),
51            RpcError::Internal(msg) => write!(f, "Internal error: {}", msg),
52        }
53    }
54}
55
56impl std::error::Error for RpcError {}
57
58/// Service provided by each shard in the distributed colony.
59///
60/// This service handles document ingestion, tick phase execution,
61/// local queries, and cross-shard coordination.
62#[tarpc::service]
63pub trait ShardService {
64    /// Ingest a document into this shard.
65    ///
66    /// The document will be processed by local agents during the next tick.
67    /// Returns the document ID assigned to the ingested document.
68    async fn ingest_document(doc: Document) -> RpcResult<DocumentId>;
69
70    /// Execute a tick phase on this shard.
71    ///
72    /// Phases are executed in order: Sense -> Act -> Decay -> Advance.
73    /// Each phase must complete on all shards before the next phase begins (barrier sync).
74    async fn tick_phase(phase: TickPhase, tick: u64) -> RpcResult<PhaseResult>;
75
76    /// Execute a local query (part of distributed query).
77    ///
78    /// Returns matching nodes from this shard's portion of the graph.
79    /// Results are combined by the coordinator using scatter-gather.
80    async fn local_query(req: LocalQueryRequest) -> RpcResult<LocalQueryResult>;
81
82    /// Get term frequencies for global DF computation.
83    ///
84    /// Returns a map of term -> document frequency for TF-IDF calculation.
85    /// Used during global DF aggregation for TF-IDF scoring.
86    async fn get_term_frequencies(terms: Vec<String>) -> RpcResult<HashMap<String, u64>>;
87
88    /// Fetch a node's full data (for ghost node resolution).
89    ///
90    /// Used when a shard needs detailed information about a node
91    /// that exists on another shard.
92    async fn get_node(id: NodeId) -> RpcResult<Option<NodeData>>;
93
94    /// Health check.
95    ///
96    /// Returns the current health status of this shard including
97    /// resource usage and processing statistics.
98    async fn health_check() -> RpcResult<ShardHealth>;
99
100    /// Resolve cross-shard edges by fetching ghost node data.
101    ///
102    /// Batch operation to fetch data for multiple nodes at once,
103    /// creating ghost node representations for cross-shard edges.
104    async fn resolve_ghost_nodes(node_ids: Vec<NodeId>) -> RpcResult<Vec<GhostNode>>;
105
106    /// Get the list of nodes connected to a given node.
107    ///
108    /// Returns node IDs of all neighbors, including cross-shard references.
109    async fn get_neighbors(node_id: NodeId) -> RpcResult<Vec<NodeId>>;
110
111    /// Receive cross-shard signals during the Exchange phase.
112    ///
113    /// Signals from other shards are delivered here for local processing.
114    async fn receive_signals(signals: Vec<crate::rpc::messages::CrossShardSignal>)
115        -> RpcResult<()>;
116}
117
118/// Service provided by the coordinator.
119///
120/// The coordinator manages shard registration, tick synchronization,
121/// query distribution, and global state aggregation.
122#[tarpc::service]
123pub trait CoordinatorService {
124    /// Register a new shard with the coordinator.
125    ///
126    /// Returns the assigned shard ID. The coordinator will include
127    /// this shard in subsequent tick coordination.
128    async fn register(info: ShardInfo) -> RpcResult<ShardId>;
129
130    /// Unregister a shard from the coordinator.
131    ///
132    /// Should be called during graceful shutdown. The coordinator
133    /// will stop routing requests to this shard.
134    async fn unregister(shard_id: ShardId) -> RpcResult<()>;
135
136    /// Report that a shard has completed a tick phase.
137    ///
138    /// Used for barrier synchronization. The coordinator tracks
139    /// completion across all shards before advancing to the next phase.
140    async fn phase_complete(shard_id: ShardId, phase: TickPhase, tick: u64) -> RpcResult<()>;
141
142    /// Get the shard responsible for a document.
143    ///
144    /// Uses consistent hashing to determine which shard owns a document.
145    async fn route_document(doc_id: DocumentId) -> ShardId;
146
147    /// Get the shard responsible for a node.
148    ///
149    /// Uses consistent hashing based on node ID.
150    async fn route_node(node_id: NodeId) -> ShardId;
151
152    /// Get global document frequencies for TF-IDF.
153    ///
154    /// Returns aggregated term frequencies across all shards.
155    async fn get_global_df(terms: Vec<String>) -> RpcResult<HashMap<String, u64>>;
156
157    /// Signal ready for next phase (barrier).
158    ///
159    /// Returns true when all shards are ready and the phase can proceed.
160    async fn barrier_ready(shard_id: ShardId, phase: TickPhase, tick: u64) -> RpcResult<bool>;
161
162    /// Get current tick number.
163    ///
164    /// Returns the global tick counter maintained by the coordinator.
165    async fn current_tick() -> u64;
166
167    /// Get all registered shards.
168    ///
169    /// Returns information about all shards in the cluster.
170    async fn list_shards() -> Vec<ShardInfo>;
171
172    /// Request to start a new tick.
173    ///
174    /// Only succeeds if no tick is currently in progress.
175    async fn start_tick() -> RpcResult<u64>;
176
177    /// Get the status of the current tick.
178    ///
179    /// Returns the current phase and which shards have completed it.
180    async fn tick_status() -> RpcResult<TickStatus>;
181}
182
183/// Status of the current tick across all shards.
184#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
185pub struct TickStatus {
186    /// Current tick number.
187    pub tick: u64,
188    /// Current phase being executed.
189    pub phase: TickPhase,
190    /// Shards that have completed the current phase.
191    pub completed_shards: Vec<ShardId>,
192    /// Shards that are still processing.
193    pub pending_shards: Vec<ShardId>,
194    /// Whether the tick is complete.
195    pub tick_complete: bool,
196}