phago_distributed/rpc/protocol.rs
1//! tarpc service trait definitions.
2//!
3//! This module defines the RPC service interfaces for distributed
4//! colony coordination using tarpc's procedural macro system.
5
6use crate::types::{
7 GhostNode, LocalQueryRequest, LocalQueryResult, PhaseResult, ShardHealth, ShardId, ShardInfo,
8 TickPhase,
9};
10use phago_core::types::{Document, DocumentId, NodeData, NodeId};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// Result type for RPC operations that needs to be serializable.
15pub type RpcResult<T> = Result<T, RpcError>;
16
17/// Serializable error type for RPC calls.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub enum RpcError {
20 /// Shard not found.
21 ShardNotFound(u32),
22 /// Coordinator unavailable.
23 CoordinatorUnavailable,
24 /// RPC call failed.
25 RpcFailed(String),
26 /// Phase timeout.
27 PhaseTimeout(String),
28 /// Document routing failed.
29 RoutingFailed,
30 /// Edge resolution failed.
31 EdgeResolutionFailed,
32 /// Ghost node not found.
33 GhostNodeNotFound,
34 /// Barrier synchronization failed.
35 BarrierFailed,
36 /// Internal error.
37 Internal(String),
38}
39
40impl std::fmt::Display for RpcError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 RpcError::ShardNotFound(id) => write!(f, "Shard {} not found", id),
44 RpcError::CoordinatorUnavailable => write!(f, "Coordinator unavailable"),
45 RpcError::RpcFailed(msg) => write!(f, "RPC failed: {}", msg),
46 RpcError::PhaseTimeout(phase) => write!(f, "Phase timeout: {}", phase),
47 RpcError::RoutingFailed => write!(f, "Document routing failed"),
48 RpcError::EdgeResolutionFailed => write!(f, "Edge resolution failed"),
49 RpcError::GhostNodeNotFound => write!(f, "Ghost node not found"),
50 RpcError::BarrierFailed => write!(f, "Barrier synchronization failed"),
51 RpcError::Internal(msg) => write!(f, "Internal error: {}", msg),
52 }
53 }
54}
55
56impl std::error::Error for RpcError {}
57
58/// Service provided by each shard in the distributed colony.
59///
60/// This service handles document ingestion, tick phase execution,
61/// local queries, and cross-shard coordination.
62#[tarpc::service]
63pub trait ShardService {
64 /// Ingest a document into this shard.
65 ///
66 /// The document will be processed by local agents during the next tick.
67 /// Returns the document ID assigned to the ingested document.
68 async fn ingest_document(doc: Document) -> RpcResult<DocumentId>;
69
70 /// Execute a tick phase on this shard.
71 ///
72 /// Phases are executed in order: Sense -> Act -> Decay -> Advance.
73 /// Each phase must complete on all shards before the next phase begins (barrier sync).
74 async fn tick_phase(phase: TickPhase, tick: u64) -> RpcResult<PhaseResult>;
75
76 /// Execute a local query (part of distributed query).
77 ///
78 /// Returns matching nodes from this shard's portion of the graph.
79 /// Results are combined by the coordinator using scatter-gather.
80 async fn local_query(req: LocalQueryRequest) -> RpcResult<LocalQueryResult>;
81
82 /// Get term frequencies for global DF computation.
83 ///
84 /// Returns a map of term -> document frequency for TF-IDF calculation.
85 /// Used during global DF aggregation for TF-IDF scoring.
86 async fn get_term_frequencies(terms: Vec<String>) -> RpcResult<HashMap<String, u64>>;
87
88 /// Fetch a node's full data (for ghost node resolution).
89 ///
90 /// Used when a shard needs detailed information about a node
91 /// that exists on another shard.
92 async fn get_node(id: NodeId) -> RpcResult<Option<NodeData>>;
93
94 /// Health check.
95 ///
96 /// Returns the current health status of this shard including
97 /// resource usage and processing statistics.
98 async fn health_check() -> RpcResult<ShardHealth>;
99
100 /// Resolve cross-shard edges by fetching ghost node data.
101 ///
102 /// Batch operation to fetch data for multiple nodes at once,
103 /// creating ghost node representations for cross-shard edges.
104 async fn resolve_ghost_nodes(node_ids: Vec<NodeId>) -> RpcResult<Vec<GhostNode>>;
105
106 /// Get the list of nodes connected to a given node.
107 ///
108 /// Returns node IDs of all neighbors, including cross-shard references.
109 async fn get_neighbors(node_id: NodeId) -> RpcResult<Vec<NodeId>>;
110
111 /// Receive cross-shard signals during the Exchange phase.
112 ///
113 /// Signals from other shards are delivered here for local processing.
114 async fn receive_signals(signals: Vec<crate::rpc::messages::CrossShardSignal>)
115 -> RpcResult<()>;
116}
117
118/// Service provided by the coordinator.
119///
120/// The coordinator manages shard registration, tick synchronization,
121/// query distribution, and global state aggregation.
122#[tarpc::service]
123pub trait CoordinatorService {
124 /// Register a new shard with the coordinator.
125 ///
126 /// Returns the assigned shard ID. The coordinator will include
127 /// this shard in subsequent tick coordination.
128 async fn register(info: ShardInfo) -> RpcResult<ShardId>;
129
130 /// Unregister a shard from the coordinator.
131 ///
132 /// Should be called during graceful shutdown. The coordinator
133 /// will stop routing requests to this shard.
134 async fn unregister(shard_id: ShardId) -> RpcResult<()>;
135
136 /// Report that a shard has completed a tick phase.
137 ///
138 /// Used for barrier synchronization. The coordinator tracks
139 /// completion across all shards before advancing to the next phase.
140 async fn phase_complete(shard_id: ShardId, phase: TickPhase, tick: u64) -> RpcResult<()>;
141
142 /// Get the shard responsible for a document.
143 ///
144 /// Uses consistent hashing to determine which shard owns a document.
145 async fn route_document(doc_id: DocumentId) -> ShardId;
146
147 /// Get the shard responsible for a node.
148 ///
149 /// Uses consistent hashing based on node ID.
150 async fn route_node(node_id: NodeId) -> ShardId;
151
152 /// Get global document frequencies for TF-IDF.
153 ///
154 /// Returns aggregated term frequencies across all shards.
155 async fn get_global_df(terms: Vec<String>) -> RpcResult<HashMap<String, u64>>;
156
157 /// Signal ready for next phase (barrier).
158 ///
159 /// Returns true when all shards are ready and the phase can proceed.
160 async fn barrier_ready(shard_id: ShardId, phase: TickPhase, tick: u64) -> RpcResult<bool>;
161
162 /// Get current tick number.
163 ///
164 /// Returns the global tick counter maintained by the coordinator.
165 async fn current_tick() -> u64;
166
167 /// Get all registered shards.
168 ///
169 /// Returns information about all shards in the cluster.
170 async fn list_shards() -> Vec<ShardInfo>;
171
172 /// Request to start a new tick.
173 ///
174 /// Only succeeds if no tick is currently in progress.
175 async fn start_tick() -> RpcResult<u64>;
176
177 /// Get the status of the current tick.
178 ///
179 /// Returns the current phase and which shards have completed it.
180 async fn tick_status() -> RpcResult<TickStatus>;
181}
182
183/// Status of the current tick across all shards.
184#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
185pub struct TickStatus {
186 /// Current tick number.
187 pub tick: u64,
188 /// Current phase being executed.
189 pub phase: TickPhase,
190 /// Shards that have completed the current phase.
191 pub completed_shards: Vec<ShardId>,
192 /// Shards that are still processing.
193 pub pending_shards: Vec<ShardId>,
194 /// Whether the tick is complete.
195 pub tick_complete: bool,
196}