1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
//! Error types for the Goosefs Rust client.
//!
//! Follows the pattern from fluss-rust: a single `Error` enum with `thiserror`
//! derive, a `Result<T>` alias, and `From` conversions for common upstream errors.
use thiserror::Error;
/// Convenience type alias used throughout the crate.
pub type Result<T> = std::result::Result<T, Error>;
/// Top-level error type for goosefs-sdk.
#[derive(Debug, Error)]
pub enum Error {
/// gRPC transport or protocol error (from tonic).
#[error("gRPC error: {message} — {source}")]
GrpcError {
message: String,
source: tonic::Status,
},
/// gRPC transport / connection-level error.
#[error("gRPC transport error: {message} — {source}")]
TransportError {
message: String,
source: tonic::transport::Error,
},
/// The file or directory was not found on Goosefs.
#[error("not found: {path}")]
NotFound { path: String },
/// The file or directory already exists.
#[error("already exists: {path}")]
AlreadyExists { path: String },
/// Permission denied for the requested operation.
#[error("permission denied: {message}")]
PermissionDenied { message: String },
/// Invalid argument supplied by the caller.
#[error("invalid argument: {message}")]
InvalidArgument { message: String },
/// A required field was missing in the protobuf response.
#[error("missing field in response: {field}")]
MissingField { field: String },
/// Block read/write I/O error.
#[error("block IO error: {message}")]
BlockIoError { message: String },
/// No worker available to serve the request.
#[error("no worker available: {message}")]
NoWorkerAvailable { message: String },
/// No primary Master could be discovered (HA polling failed).
#[error("master unavailable: {message}")]
MasterUnavailable { message: String },
/// Configuration error.
#[error("config error: {message}")]
ConfigError { message: String },
/// Generic internal error with an optional boxed source.
#[error("internal error: {message}")]
Internal {
message: String,
#[source]
source: Option<Box<dyn std::error::Error + Send + Sync>>,
},
// -----------------------------------------------------------------------
// Domain-specific errors mapped from Java server exceptions.
//
// Java server throws strongly-typed exceptions which arrive at the gRPC
// boundary as FAILED_PRECONDITION status codes. The message text carries
// the discriminating keyword so we parse it here to produce Rust domain
// errors instead of the opaque GrpcError catch-all.
// -----------------------------------------------------------------------
/// The file exists but has not been completed yet (INCOMPLETE state).
///
/// Java: `FileIncompleteException` → gRPC `FAILED_PRECONDITION` with
/// message containing `"is incomplete"`.
/// This can happen when reading a file that another writer has not yet
/// called `close()` on.
#[error("file is incomplete: {message}")]
FileIncomplete { message: String },
/// Attempted to delete a non-empty directory without `recursive = true`.
///
/// Java: `DirectoryNotEmptyException` → gRPC `FAILED_PRECONDITION` with
/// message containing `"is not empty"`.
#[error("directory is not empty: {message}")]
DirectoryNotEmpty { message: String },
/// Attempted to open a path that refers to a directory (not a file).
///
/// Java: `IsDirectoryException` → gRPC `FAILED_PRECONDITION` with
/// message containing `"Is a directory"`.
#[error("path is a directory: {path}")]
OpenDirectory { path: String },
/// The supplied path string is syntactically invalid.
///
/// Java: `InvalidPathException` → gRPC `INVALID_ARGUMENT` with the path
/// in the message. Kept separate from `InvalidArgument` so callers can
/// match on path problems specifically.
#[error("invalid path: {path}")]
InvalidPath { path: String },
// -----------------------------------------------------------------------
// Authentication errors — must NOT trigger worker blacklisting.
//
// When a worker rejects a request with UNAUTHENTICATED the caller should
// surface this error and let the higher-level retry policy decide whether
// to re-authenticate rather than permanently removing the worker from the
// routing table.
// -----------------------------------------------------------------------
/// Authentication with the Master or Worker failed.
///
/// Mapped from tonic `UNAUTHENTICATED` status. The router **must not**
/// mark the worker as failed when this error is returned — the worker is
/// healthy; only the credentials are wrong.
#[error("authentication failed: {message}")]
AuthenticationFailed { message: String },
}
// ---------------------------------------------------------------------------
// From conversions
// ---------------------------------------------------------------------------
impl From<tonic::Status> for Error {
fn from(status: tonic::Status) -> Self {
match status.code() {
tonic::Code::NotFound => Error::NotFound {
path: status.message().to_string(),
},
tonic::Code::AlreadyExists => Error::AlreadyExists {
path: status.message().to_string(),
},
tonic::Code::PermissionDenied => Error::PermissionDenied {
message: status.message().to_string(),
},
tonic::Code::InvalidArgument => Error::InvalidArgument {
message: status.message().to_string(),
},
// Authentication failures — surface as a dedicated variant so
// the WorkerRouter is NOT instructed to blacklist the worker.
tonic::Code::Unauthenticated => Error::AuthenticationFailed {
message: status.message().to_string(),
},
// FAILED_PRECONDITION carries several distinct Java exceptions;
// disambiguate by inspecting the message text.
//
// Java exception → gRPC message keyword mapping (verified against
// DefaultFileSystemMaster.java and GoosefsStatusException.java):
// FileIncompleteException → "is incomplete"
// DirectoryNotEmptyException → "is not empty"
// IsDirectoryException → "Is a directory"
tonic::Code::FailedPrecondition => {
let msg = status.message();
if msg.contains("is not empty") {
Error::DirectoryNotEmpty {
message: msg.to_string(),
}
} else if msg.contains("is incomplete") {
Error::FileIncomplete {
message: msg.to_string(),
}
} else if msg.contains("Is a directory") {
Error::OpenDirectory {
path: msg.to_string(),
}
} else {
Error::GrpcError {
message: format!("[{}] {}", status.code(), msg),
source: status,
}
}
}
_ => Error::GrpcError {
message: format!("[{}] {}", status.code(), status.message()),
source: status,
},
}
}
}
impl From<tonic::transport::Error> for Error {
fn from(err: tonic::transport::Error) -> Self {
Error::TransportError {
message: err.to_string(),
source: err,
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
impl Error {
/// Returns `true` if this error is retriable (transient network / unavailable).
pub fn is_retriable(&self) -> bool {
match self {
Error::GrpcError { source, .. } => matches!(
source.code(),
tonic::Code::Unavailable | tonic::Code::DeadlineExceeded | tonic::Code::Aborted
),
Error::TransportError { .. } => true,
// Authentication failures are retriable — the SASL stream may have
// expired (e.g. after process fork or long idle). The caller should
// invalidate the cached channel and re-authenticate before retrying.
Error::AuthenticationFailed { .. } => true,
_ => false,
}
}
/// Returns `true` if the file was not found.
pub fn is_not_found(&self) -> bool {
matches!(self, Error::NotFound { .. })
}
/// Returns `true` if the path already exists.
pub fn is_already_exists(&self) -> bool {
matches!(self, Error::AlreadyExists { .. })
}
/// Returns `true` if the file exists but is in INCOMPLETE (not yet closed) state.
pub fn is_file_incomplete(&self) -> bool {
matches!(self, Error::FileIncomplete { .. })
}
/// Returns `true` if the directory is not empty.
pub fn is_directory_not_empty(&self) -> bool {
matches!(self, Error::DirectoryNotEmpty { .. })
}
/// Returns `true` if the authentication credentials were rejected.
///
/// When this returns `true` the caller should **not** mark the worker as
/// failed — the worker itself is healthy.
pub fn is_authentication_failed(&self) -> bool {
matches!(self, Error::AuthenticationFailed { .. })
}
/// Returns `true` if the error is a permission / authentication problem.
///
/// This covers both `PermissionDenied` (authorisation) and
/// `AuthenticationFailed` (authentication).
pub fn is_access_denied(&self) -> bool {
matches!(
self,
Error::PermissionDenied { .. } | Error::AuthenticationFailed { .. }
)
}
/// Convenience constructor for missing-field errors.
pub fn missing_field(field: impl Into<String>) -> Self {
Error::MissingField {
field: field.into(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unauthenticated_maps_to_authentication_failed() {
let status = tonic::Status::unauthenticated("token expired");
let err = Error::from(status);
assert!(err.is_authentication_failed());
// AuthenticationFailed is now retriable — the caller should invalidate
// the cached channel and re-authenticate before retrying.
assert!(err.is_retriable());
}
#[test]
fn test_failed_precondition_directory_not_empty() {
let status =
tonic::Status::failed_precondition("/foo/bar is not empty, cannot delete recursively");
let err = Error::from(status);
assert!(
err.is_directory_not_empty(),
"expected DirectoryNotEmpty, got {:?}",
err
);
}
#[test]
fn test_failed_precondition_file_incomplete() {
let status = tonic::Status::failed_precondition("/tmp/partial.parquet is incomplete");
let err = Error::from(status);
assert!(
err.is_file_incomplete(),
"expected FileIncomplete, got {:?}",
err
);
}
#[test]
fn test_failed_precondition_is_directory() {
let status = tonic::Status::failed_precondition("/data/dir Is a directory");
let err = Error::from(status);
assert!(
matches!(err, Error::OpenDirectory { .. }),
"expected OpenDirectory, got {:?}",
err
);
}
#[test]
fn test_failed_precondition_unknown_falls_through_to_grpc_error() {
let status = tonic::Status::failed_precondition("some other precondition failure");
let err = Error::from(status);
assert!(
matches!(err, Error::GrpcError { .. }),
"expected GrpcError fallthrough, got {:?}",
err
);
}
#[test]
fn test_not_found_helper() {
let status = tonic::Status::not_found("/missing");
let err = Error::from(status);
assert!(err.is_not_found());
}
#[test]
fn test_is_access_denied_covers_both_variants() {
let perm = Error::PermissionDenied {
message: "no".to_string(),
};
let auth = Error::AuthenticationFailed {
message: "expired".to_string(),
};
assert!(perm.is_access_denied());
assert!(auth.is_access_denied());
}
}