Skip to main content

phago_distributed/rpc/
messages.rs

1//! Additional message types for RPC communication.
2//!
3//! This module defines message structures for various distributed
4//! operations that don't fit directly into the service traits.
5
6use crate::types::{CrossShardEdge, ScoredNode, ShardId};
7use phago_core::types::{AgentId, NodeId, Position, SignalType, Tick};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11/// Message indicating a distributed tick should start.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct StartTickMessage {
14    /// The tick number to start.
15    pub tick: Tick,
16    /// The shard that initiated the tick (usually coordinator).
17    pub initiator: ShardId,
18    /// Timestamp when the tick was initiated.
19    pub timestamp_ms: u64,
20}
21
22/// Message for cross-shard edge notification.
23///
24/// Sent during the Exchange phase to notify other shards
25/// about edges that cross shard boundaries.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct CrossShardEdgeNotification {
28    /// The edges being reported.
29    pub edges: Vec<CrossShardEdge>,
30    /// The shard sending this notification.
31    pub source_shard: ShardId,
32    /// The tick during which these edges were created.
33    pub tick: Tick,
34}
35
36/// Query scatter request (from coordinator to shards).
37///
38/// Part of the scatter-gather query pattern. The coordinator
39/// sends this to all shards to execute local queries.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct QueryScatterRequest {
42    /// Unique identifier for this query.
43    pub query_id: u64,
44    /// Search terms for the query.
45    pub terms: Vec<String>,
46    /// Maximum results to return from each shard.
47    pub max_local_results: usize,
48    /// Optional embedding vector for semantic search.
49    pub embedding: Option<Vec<f32>>,
50    /// Whether to search ghost nodes.
51    pub include_ghosts: bool,
52}
53
54/// Query gather response (from shards to coordinator).
55///
56/// Shards send this back to the coordinator with their local results.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct QueryGatherResponse {
59    /// The query ID this response is for.
60    pub query_id: u64,
61    /// The shard that produced these results.
62    pub shard_id: ShardId,
63    /// Matching nodes with scores.
64    pub results: Vec<ScoredNode>,
65    /// Term frequencies for TF-IDF computation.
66    pub term_frequencies: HashMap<String, u64>,
67    /// Total document count on this shard (for IDF calculation).
68    pub total_documents: u64,
69    /// Time taken to execute the query in milliseconds.
70    pub execution_time_ms: u64,
71}
72
73/// A signal that crosses shard boundaries.
74///
75/// Sent during the Exchange phase when an agent's signal
76/// affects regions on other shards.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CrossShardSignal {
79    /// Type of the signal.
80    pub signal_type: SignalType,
81    /// Intensity of the signal.
82    pub intensity: f64,
83    /// Position where the signal was emitted.
84    pub position: Position,
85    /// Agent that emitted the signal.
86    pub emitter: AgentId,
87    /// Tick when the signal was emitted.
88    pub tick: Tick,
89    /// Source shard where the signal originated.
90    pub source_shard: ShardId,
91}
92
93/// Heartbeat message from shard to coordinator.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct HeartbeatMessage {
96    /// The shard sending the heartbeat.
97    pub shard_id: ShardId,
98    /// Current tick on the shard.
99    pub current_tick: Tick,
100    /// Number of agents active on the shard.
101    pub agent_count: u64,
102    /// Number of documents stored.
103    pub document_count: u64,
104    /// Number of nodes in the graph.
105    pub node_count: u64,
106    /// Memory usage in bytes.
107    pub memory_bytes: u64,
108    /// Timestamp of the heartbeat.
109    pub timestamp_ms: u64,
110}
111
112/// Response to a heartbeat.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct HeartbeatResponse {
115    /// Whether the coordinator acknowledged the heartbeat.
116    pub acknowledged: bool,
117    /// Expected tick (for drift detection).
118    pub expected_tick: Tick,
119    /// Any pending commands for the shard.
120    pub commands: Vec<ShardCommand>,
121}
122
123/// Commands that the coordinator can send to shards.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum ShardCommand {
126    /// Pause processing (e.g., for maintenance).
127    Pause,
128    /// Resume processing.
129    Resume,
130    /// Trigger a consistency check.
131    ConsistencyCheck,
132    /// Compact the local graph.
133    Compact,
134    /// Sync ghost nodes.
135    SyncGhosts,
136    /// Shutdown gracefully.
137    Shutdown,
138}
139
140/// Request to transfer a node to another shard.
141///
142/// Used during rebalancing operations.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct NodeTransferRequest {
145    /// Node to transfer.
146    pub node_id: NodeId,
147    /// Current shard (source).
148    pub from_shard: ShardId,
149    /// Target shard (destination).
150    pub to_shard: ShardId,
151    /// Include connected edges.
152    pub include_edges: bool,
153}
154
155/// Response to a node transfer request.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct NodeTransferResponse {
158    /// Whether the transfer was successful.
159    pub success: bool,
160    /// Node ID that was transferred.
161    pub node_id: NodeId,
162    /// Number of edges transferred with the node.
163    pub edges_transferred: u64,
164    /// Any error message if transfer failed.
165    pub error: Option<String>,
166}
167
168/// Batch of updates to apply atomically.
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct BatchUpdate {
171    /// Updates to apply.
172    pub updates: Vec<UpdateOperation>,
173    /// Tick during which these updates should be applied.
174    pub tick: Tick,
175    /// Whether all updates must succeed (atomic).
176    pub atomic: bool,
177}
178
179/// A single update operation.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub enum UpdateOperation {
182    /// Create or update a node.
183    UpsertNode {
184        id: NodeId,
185        label: String,
186        position: Position,
187        embedding: Option<Vec<f32>>,
188    },
189    /// Create or update an edge.
190    UpsertEdge {
191        from: NodeId,
192        to: NodeId,
193        weight: f64,
194    },
195    /// Delete a node.
196    DeleteNode { id: NodeId },
197    /// Delete an edge.
198    DeleteEdge { from: NodeId, to: NodeId },
199    /// Increment edge weight (Hebbian learning).
200    IncrementEdge {
201        from: NodeId,
202        to: NodeId,
203        delta: f64,
204    },
205}
206
207/// Result of applying a batch update.
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BatchUpdateResult {
210    /// Number of operations that succeeded.
211    pub succeeded: u64,
212    /// Number of operations that failed.
213    pub failed: u64,
214    /// Errors for failed operations (index -> error).
215    pub errors: HashMap<usize, String>,
216}