1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use std::path::PathBuf;
use ractor::ActorRef;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixListener,
};
use crate::{
master::ValorMasterMessage,
types::{ValorID, ValorIdExt},
uds::{UdsRegistrationResponse, UdsWorkerRegistration},
};
/// # UDS Server
///
/// UDS server for Valor Master to handle worker registration requests.
/// Workers register with master via UDS for task management.
#[derive(Clone)]
pub struct UdsServer {
/// Master's UDS socket path
uds_path: PathBuf,
/// Master's unique identifier
master_id: ValorID,
/// Master Actor Reference
master_actor: ActorRef<ValorMasterMessage>,
}
impl UdsServer {
/// Create a new UDS server for the master
pub fn new(
uds_path: PathBuf,
master_id: &str,
master_actor: ActorRef<ValorMasterMessage>,
) -> Self {
Self {
uds_path,
master_id: ValorID::new_master(master_id),
master_actor,
}
}
/// Start the UDS server and listen for worker registration requests
pub async fn start(&self) -> anyhow::Result<()> {
// If socket exists, check if an active master is already listening
if self.uds_path.exists() {
match tokio::net::UnixStream::connect(&self.uds_path).await {
Ok(_) => {
// Another master is active; refuse to start
anyhow::bail!(
"UDS path {} already in use by an active master; refusing to start",
self.uds_path.display()
);
}
Err(_) => {
// Stale socket; remove it
let _ = std::fs::remove_file(&self.uds_path);
}
}
}
// Create Unix domain socket listener
let listener = UnixListener::bind(&self.uds_path)?;
tracing::info!(
"UDS server started for master {} on {}",
self.master_id,
self.uds_path.display()
);
// Handle worker registration requests
let master_id = self.master_id.clone();
let master_actor = self.master_actor.clone();
let uds_path_display = self.uds_path.display().to_string();
let srv_span = tracing::info_span!(
"flow.master.uds.accept_loop",
master_id = %master_id,
uds_path = %uds_path_display
);
tokio::spawn({
let srv_span = srv_span.clone();
async move {
let _e = srv_span.enter();
loop {
match listener.accept().await {
Ok((mut stream, _addr)) => {
tracing::info!("Master {} received connection from worker", master_id);
// Read registration data from stream
let mut buffer = Vec::new();
let mut chunk = [0u8; 1024];
loop {
match stream.read(&mut chunk).await {
Ok(n) => {
if n == 0 {
break; // EOF
}
buffer.extend_from_slice(&chunk[..n]);
}
Err(e) => {
tracing::error!("Error reading from UDS stream: {}", e);
break;
}
}
}
// Deserialize registration
match serde_json::from_slice::<UdsWorkerRegistration>(&buffer) {
Ok(registration) => {
let conn_span = tracing::info_span!(
"flow.master.uds.connection",
worker_id = %registration.worker_id
);
tracing::info!(
parent: &conn_span,
"Master {} received registration from worker {}",
master_id,
registration.worker_id
);
// Always accept via UDS; leave duplicate handling to Master
let _ = master_actor.cast(
ValorMasterMessage::SouthboundRegisterWorker(
registration.worker_id,
registration.ipv4_addr,
),
);
// Send success response
let response = UdsRegistrationResponse::Accepted;
let response_bytes =
serde_json::to_vec(&response).unwrap_or_default();
let _ = stream.write_all(&response_bytes).await;
}
Err(e) => {
tracing::error!("Failed to deserialize registration: {}", e);
let response = UdsRegistrationResponse::Rejected {
reason: format!("Invalid registration format: {e}"),
};
let response_bytes =
serde_json::to_vec(&response).unwrap_or_default();
let _ = stream.write_all(&response_bytes).await;
}
}
}
Err(e) => {
tracing::error!("UDS server accept error: {}", e);
break;
}
}
}
}
});
Ok(())
}
}