phago_distributed/rpc/messages.rs
1//! Additional message types for RPC communication.
2//!
3//! This module defines message structures for various distributed
4//! operations that don't fit directly into the service traits.
5
6use crate::types::{CrossShardEdge, ScoredNode, ShardId};
7use phago_core::types::{AgentId, NodeId, Position, SignalType, Tick};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Message indicating a distributed tick should start.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct StartTickMessage {
14 /// The tick number to start.
15 pub tick: Tick,
16 /// The shard that initiated the tick (usually coordinator).
17 pub initiator: ShardId,
18 /// Timestamp when the tick was initiated.
19 pub timestamp_ms: u64,
20}
21
22/// Message for cross-shard edge notification.
23///
24/// Sent during the Exchange phase to notify other shards
25/// about edges that cross shard boundaries.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct CrossShardEdgeNotification {
28 /// The edges being reported.
29 pub edges: Vec<CrossShardEdge>,
30 /// The shard sending this notification.
31 pub source_shard: ShardId,
32 /// The tick during which these edges were created.
33 pub tick: Tick,
34}
35
36/// Query scatter request (from coordinator to shards).
37///
38/// Part of the scatter-gather query pattern. The coordinator
39/// sends this to all shards to execute local queries.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct QueryScatterRequest {
42 /// Unique identifier for this query.
43 pub query_id: u64,
44 /// Search terms for the query.
45 pub terms: Vec<String>,
46 /// Maximum results to return from each shard.
47 pub max_local_results: usize,
48 /// Optional embedding vector for semantic search.
49 pub embedding: Option<Vec<f32>>,
50 /// Whether to search ghost nodes.
51 pub include_ghosts: bool,
52}
53
54/// Query gather response (from shards to coordinator).
55///
56/// Shards send this back to the coordinator with their local results.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct QueryGatherResponse {
59 /// The query ID this response is for.
60 pub query_id: u64,
61 /// The shard that produced these results.
62 pub shard_id: ShardId,
63 /// Matching nodes with scores.
64 pub results: Vec<ScoredNode>,
65 /// Term frequencies for TF-IDF computation.
66 pub term_frequencies: HashMap<String, u64>,
67 /// Total document count on this shard (for IDF calculation).
68 pub total_documents: u64,
69 /// Time taken to execute the query in milliseconds.
70 pub execution_time_ms: u64,
71}
72
73/// A signal that crosses shard boundaries.
74///
75/// Sent during the Exchange phase when an agent's signal
76/// affects regions on other shards.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CrossShardSignal {
79 /// Type of the signal.
80 pub signal_type: SignalType,
81 /// Intensity of the signal.
82 pub intensity: f64,
83 /// Position where the signal was emitted.
84 pub position: Position,
85 /// Agent that emitted the signal.
86 pub emitter: AgentId,
87 /// Tick when the signal was emitted.
88 pub tick: Tick,
89 /// Source shard where the signal originated.
90 pub source_shard: ShardId,
91}
92
93/// Heartbeat message from shard to coordinator.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct HeartbeatMessage {
96 /// The shard sending the heartbeat.
97 pub shard_id: ShardId,
98 /// Current tick on the shard.
99 pub current_tick: Tick,
100 /// Number of agents active on the shard.
101 pub agent_count: u64,
102 /// Number of documents stored.
103 pub document_count: u64,
104 /// Number of nodes in the graph.
105 pub node_count: u64,
106 /// Memory usage in bytes.
107 pub memory_bytes: u64,
108 /// Timestamp of the heartbeat.
109 pub timestamp_ms: u64,
110}
111
112/// Response to a heartbeat.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct HeartbeatResponse {
115 /// Whether the coordinator acknowledged the heartbeat.
116 pub acknowledged: bool,
117 /// Expected tick (for drift detection).
118 pub expected_tick: Tick,
119 /// Any pending commands for the shard.
120 pub commands: Vec<ShardCommand>,
121}
122
123/// Commands that the coordinator can send to shards.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum ShardCommand {
126 /// Pause processing (e.g., for maintenance).
127 Pause,
128 /// Resume processing.
129 Resume,
130 /// Trigger a consistency check.
131 ConsistencyCheck,
132 /// Compact the local graph.
133 Compact,
134 /// Sync ghost nodes.
135 SyncGhosts,
136 /// Shutdown gracefully.
137 Shutdown,
138}
139
140/// Request to transfer a node to another shard.
141///
142/// Used during rebalancing operations.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct NodeTransferRequest {
145 /// Node to transfer.
146 pub node_id: NodeId,
147 /// Current shard (source).
148 pub from_shard: ShardId,
149 /// Target shard (destination).
150 pub to_shard: ShardId,
151 /// Include connected edges.
152 pub include_edges: bool,
153}
154
155/// Response to a node transfer request.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct NodeTransferResponse {
158 /// Whether the transfer was successful.
159 pub success: bool,
160 /// Node ID that was transferred.
161 pub node_id: NodeId,
162 /// Number of edges transferred with the node.
163 pub edges_transferred: u64,
164 /// Any error message if transfer failed.
165 pub error: Option<String>,
166}
167
168/// Batch of updates to apply atomically.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct BatchUpdate {
171 /// Updates to apply.
172 pub updates: Vec<UpdateOperation>,
173 /// Tick during which these updates should be applied.
174 pub tick: Tick,
175 /// Whether all updates must succeed (atomic).
176 pub atomic: bool,
177}
178
179/// A single update operation.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub enum UpdateOperation {
182 /// Create or update a node.
183 UpsertNode {
184 id: NodeId,
185 label: String,
186 position: Position,
187 embedding: Option<Vec<f32>>,
188 },
189 /// Create or update an edge.
190 UpsertEdge {
191 from: NodeId,
192 to: NodeId,
193 weight: f64,
194 },
195 /// Delete a node.
196 DeleteNode { id: NodeId },
197 /// Delete an edge.
198 DeleteEdge { from: NodeId, to: NodeId },
199 /// Increment edge weight (Hebbian learning).
200 IncrementEdge {
201 from: NodeId,
202 to: NodeId,
203 delta: f64,
204 },
205}
206
207/// Result of applying a batch update.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BatchUpdateResult {
210 /// Number of operations that succeeded.
211 pub succeeded: u64,
212 /// Number of operations that failed.
213 pub failed: u64,
214 /// Errors for failed operations (index -> error).
215 pub errors: HashMap<usize, String>,
216}