1use std::fmt;
2
3use nnrp_core::{
4 validate_result_drop_header, CacheObjectKind, CommonHeader, ConnectionLifecycle,
5 FlowUpdateMetadata, FrameSubmitMetadata, InFlightPolicy, MessageType, ResultPushMetadata,
6 SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseReason, SessionMigrateAckMetadata,
7 SessionMigrateMetadata, SessionOpenAckMetadata, SessionOpenMetadata, SessionPatchAckMetadata,
8 SessionPatchMetadata, SessionPriorityClass, SessionStatus, TransportId,
9 FRAME_SUBMIT_METADATA_LEN, RESULT_PUSH_METADATA_LEN, SESSION_CLOSE_ACK_METADATA_LEN,
10 SESSION_CLOSE_METADATA_LEN, SESSION_ERROR_NONE, SESSION_MIGRATE_ACK_METADATA_LEN,
11 SESSION_MIGRATE_METADATA_LEN, SESSION_OPEN_METADATA_LEN, SESSION_PATCH_ACK_METADATA_LEN,
12 SESSION_PATCH_METADATA_LEN, STANDARD_PROFILE_TOKEN, TOKEN_DELTA_SCHEMA_ID,
13 TOKEN_DELTA_SCHEMA_VERSION,
14};
15
16use crate::{
17 BoxedFramedTransport, FramedTransport, RuntimeError, RuntimePacket, RuntimeTransportKind,
18 TcpTransport,
19};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct NnrpClientConfig {
23 pub transport: RuntimeTransportKind,
24 pub requested_session_id: u32,
25 pub profile_id: u16,
26 pub schema_id: u32,
27 pub schema_version: u32,
28 pub priority_class: SessionPriorityClass,
29 pub default_deadline_ms: u32,
30 pub max_in_flight_operations: u16,
31 pub lease_ttl_hint_ms: u32,
32 pub allow_resume: bool,
33 pub resume_token_bytes: u32,
34 pub cache_hints: Vec<CacheObjectKind>,
35}
36
37impl Default for NnrpClientConfig {
38 fn default() -> Self {
39 Self {
40 transport: RuntimeTransportKind::Tcp,
41 requested_session_id: 1,
42 profile_id: STANDARD_PROFILE_TOKEN,
43 schema_id: TOKEN_DELTA_SCHEMA_ID,
44 schema_version: TOKEN_DELTA_SCHEMA_VERSION,
45 priority_class: SessionPriorityClass::Balanced,
46 default_deadline_ms: 500,
47 max_in_flight_operations: 4,
48 lease_ttl_hint_ms: 30_000,
49 allow_resume: false,
50 resume_token_bytes: 0,
51 cache_hints: Vec::new(),
52 }
53 }
54}
55
56impl NnrpClientConfig {
57 pub fn with_transport(mut self, transport: RuntimeTransportKind) -> Self {
58 self.transport = transport;
59 self
60 }
61
62 pub fn with_cache_hints(mut self, cache_hints: impl Into<Vec<CacheObjectKind>>) -> Self {
63 self.cache_hints = cache_hints.into();
64 self
65 }
66
67 pub fn with_resume(mut self, resume_token_bytes: u32) -> Self {
68 self.allow_resume = true;
69 self.resume_token_bytes = resume_token_bytes;
70 self
71 }
72}
73
74pub struct NnrpClient {
75 transport: BoxedFramedTransport,
76 config: NnrpClientConfig,
77 lifecycle: ConnectionLifecycle,
78}
79
80pub struct NnrpClientSession {
81 session_id: u32,
82 next_frame_id: u32,
83 transport: BoxedFramedTransport,
84 lifecycle: ConnectionLifecycle,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct NnrpResult {
89 pub frame_id: u32,
90 pub metadata: ResultPushMetadata,
91 pub body: Vec<u8>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum NnrpClientEvent {
96 Result(NnrpResult),
97 ResultDrop { frame_id: u32 },
98 FlowUpdate(FlowUpdateMetadata),
99}
100
101impl NnrpClient {
102 pub async fn connect_tcp(
103 addr: impl tokio::net::ToSocketAddrs,
104 config: NnrpClientConfig,
105 ) -> Result<Self, RuntimeError> {
106 if config.transport != RuntimeTransportKind::Tcp {
107 return Err(RuntimeError::UnsupportedTransport(
108 "client config selected a non-TCP transport for connect_tcp",
109 ));
110 }
111 Self::from_transport(TcpTransport::connect(addr).await?, config)
112 }
113
114 pub async fn connect_quic(
115 _endpoint: &str,
116 config: NnrpClientConfig,
117 ) -> Result<Self, RuntimeError> {
118 if config.transport != RuntimeTransportKind::Quic {
119 return Err(RuntimeError::UnsupportedTransport(
120 "client config selected a non-QUIC transport for connect_quic",
121 ));
122 }
123 Err(RuntimeError::UnsupportedTransport(
124 "QUIC provider is not installed; use from_transport with a QUIC FramedTransport",
125 ))
126 }
127
128 pub fn from_transport<T>(transport: T, config: NnrpClientConfig) -> Result<Self, RuntimeError>
129 where
130 T: FramedTransport + 'static,
131 {
132 Self::from_boxed_transport(Box::new(transport), config)
133 }
134
135 pub fn from_boxed_transport(
136 transport: BoxedFramedTransport,
137 config: NnrpClientConfig,
138 ) -> Result<Self, RuntimeError> {
139 if transport.transport_kind() != config.transport {
140 return Err(RuntimeError::UnsupportedTransport(
141 "client config transport does not match the provided transport slot",
142 ));
143 }
144 Ok(Self {
145 transport,
146 config,
147 lifecycle: ConnectionLifecycle::new(),
148 })
149 }
150
151 pub async fn open_session(mut self) -> Result<NnrpClientSession, RuntimeError> {
152 let metadata = self.session_open_metadata();
153 let mut metadata_bytes = vec![0u8; SESSION_OPEN_METADATA_LEN];
154 metadata.write(&mut metadata_bytes)?;
155
156 let header = CommonHeader::new(
157 MessageType::SessionOpen,
158 SESSION_OPEN_METADATA_LEN as u32,
159 0,
160 );
161 self.transport
162 .write_packet(&RuntimePacket::new(header, metadata_bytes, Vec::new())?)
163 .await?;
164
165 let ack_packet = self.transport.read_packet().await?;
166 if ack_packet.header.message_type != MessageType::SessionOpenAck {
167 return Err(RuntimeError::UnexpectedMessage(
168 "client expected SESSION_OPEN_ACK",
169 ));
170 }
171
172 let ack = SessionOpenAckMetadata::parse(&ack_packet.metadata)?;
173 nnrp_core::validate_session_recovery_ack(&metadata, &ack)?;
174 if !matches!(
175 ack.session_status,
176 SessionStatus::Opened | SessionStatus::Resumed
177 ) {
178 return Err(RuntimeError::UnexpectedMessage(
179 "client session open was not accepted",
180 ));
181 }
182 self.lifecycle.apply_session_open_ack(&ack)?;
183
184 Ok(NnrpClientSession {
185 session_id: ack.session_id,
186 next_frame_id: 1,
187 transport: self.transport,
188 lifecycle: self.lifecycle,
189 })
190 }
191
192 fn session_open_metadata(&self) -> SessionOpenMetadata {
193 SessionOpenMetadata {
194 requested_session_id: self.config.requested_session_id,
195 profile_id: self.config.profile_id,
196 priority_class: self.config.priority_class,
197 session_flags: if self.config.allow_resume {
198 nnrp_core::SESSION_FLAG_ALLOW_RESUME
199 } else {
200 0
201 },
202 schema_id: self.config.schema_id,
203 schema_version: self.config.schema_version,
204 default_deadline_ms: self.config.default_deadline_ms,
205 max_in_flight_operations: self.config.max_in_flight_operations,
206 lease_ttl_hint_ms: self.config.lease_ttl_hint_ms,
207 resume_token_bytes: self.config.resume_token_bytes,
208 auth_bytes: 0,
209 session_extension_bytes: 0,
210 client_session_tag: self.config.requested_session_id as u64,
211 }
212 }
213}
214
215impl NnrpClientSession {
216 pub fn session_id(&self) -> u32 {
217 self.session_id
218 }
219
220 pub fn lifecycle(&self) -> &ConnectionLifecycle {
221 &self.lifecycle
222 }
223
224 pub async fn submit(
225 &mut self,
226 metadata: FrameSubmitMetadata,
227 body: Vec<u8>,
228 ) -> Result<u32, RuntimeError> {
229 self.submit_nowait(metadata, body).await
230 }
231
232 pub async fn submit_nowait(
233 &mut self,
234 metadata: FrameSubmitMetadata,
235 body: Vec<u8>,
236 ) -> Result<u32, RuntimeError> {
237 let frame_id = self.next_frame_id;
238 self.next_frame_id = self
239 .next_frame_id
240 .checked_add(1)
241 .ok_or(RuntimeError::FrameIdOverflow)?;
242
243 let mut header = CommonHeader::new(
244 MessageType::FrameSubmit,
245 FRAME_SUBMIT_METADATA_LEN as u32,
246 body.len() as u32,
247 );
248 header.session_id = self.session_id;
249 header.frame_id = frame_id;
250
251 self.transport
252 .write_packet(&RuntimePacket::new(
253 header,
254 metadata.to_bytes()?.to_vec(),
255 body,
256 )?)
257 .await?;
258 Ok(frame_id)
259 }
260
261 pub async fn await_result(&mut self) -> Result<NnrpResult, RuntimeError> {
262 match self.await_event().await? {
263 NnrpClientEvent::Result(result) => Ok(result),
264 NnrpClientEvent::ResultDrop { .. } => Err(RuntimeError::UnexpectedMessage(
265 "client expected RESULT_PUSH but received RESULT_DROP",
266 )),
267 NnrpClientEvent::FlowUpdate(_) => Err(RuntimeError::UnexpectedMessage(
268 "client expected RESULT_PUSH but received FLOW_UPDATE",
269 )),
270 }
271 }
272
273 pub async fn await_event(&mut self) -> Result<NnrpClientEvent, RuntimeError> {
274 let packet = self.transport.read_packet().await?;
275 match packet.header.message_type {
276 MessageType::ResultPush => {
277 self.require_session_packet(&packet, "client received result for another session")?;
278 if packet.metadata.len() != RESULT_PUSH_METADATA_LEN {
279 return Err(RuntimeError::UnexpectedMessage(
280 "client received malformed RESULT_PUSH metadata length",
281 ));
282 }
283 Ok(NnrpClientEvent::Result(NnrpResult {
284 frame_id: packet.header.frame_id,
285 metadata: ResultPushMetadata::parse(&packet.metadata)?,
286 body: packet.body,
287 }))
288 }
289 MessageType::ResultDrop => {
290 self.require_session_packet(&packet, "client received drop for another session")?;
291 validate_result_drop_header(&packet.header)?;
292 Ok(NnrpClientEvent::ResultDrop {
293 frame_id: packet.header.frame_id,
294 })
295 }
296 MessageType::FlowUpdate => {
297 let metadata = FlowUpdateMetadata::parse(&packet.metadata)?;
298 self.lifecycle
299 .validate_flow_update(&packet.header, &metadata)?;
300 Ok(NnrpClientEvent::FlowUpdate(metadata))
301 }
302 _ => Err(RuntimeError::UnexpectedMessage(
303 "client expected RESULT_PUSH, RESULT_DROP, or FLOW_UPDATE",
304 )),
305 }
306 }
307
308 pub async fn cancel_frame(&mut self, frame_id: u32) -> Result<(), RuntimeError> {
309 let mut header = CommonHeader::new(MessageType::FrameCancel, 0, 0);
310 header.session_id = self.session_id;
311 header.frame_id = frame_id;
312 self.transport
313 .write_packet(&RuntimePacket::new(header, Vec::new(), Vec::new())?)
314 .await
315 }
316
317 pub async fn patch_session(
318 &mut self,
319 patch: SessionPatchMetadata,
320 ) -> Result<SessionPatchAckMetadata, RuntimeError> {
321 let mut header = CommonHeader::new(
322 MessageType::SessionPatch,
323 SESSION_PATCH_METADATA_LEN as u32,
324 patch.profile_patch_bytes,
325 );
326 header.session_id = self.session_id;
327 self.transport
328 .write_packet(&RuntimePacket::new(
329 header,
330 patch.to_bytes()?.to_vec(),
331 Vec::new(),
332 )?)
333 .await?;
334
335 let ack_packet = self.transport.read_packet().await?;
336 if ack_packet.header.message_type != MessageType::SessionPatchAck {
337 return Err(RuntimeError::UnexpectedMessage(
338 "client expected SESSION_PATCH_ACK",
339 ));
340 }
341 self.require_session_packet(&ack_packet, "client received patch ack for another session")?;
342 if ack_packet.metadata.len() != SESSION_PATCH_ACK_METADATA_LEN {
343 return Err(RuntimeError::UnexpectedMessage(
344 "client received malformed SESSION_PATCH_ACK metadata length",
345 ));
346 }
347 Ok(SessionPatchAckMetadata::parse(&ack_packet.metadata)?)
348 }
349
350 pub async fn migrate_transport(
351 &mut self,
352 request: SessionMigrateMetadata,
353 ) -> Result<SessionMigrateAckMetadata, RuntimeError> {
354 let mut header = CommonHeader::new(
355 MessageType::SessionMigrate,
356 SESSION_MIGRATE_METADATA_LEN as u32,
357 0,
358 );
359 header.session_id = self.session_id;
360 self.transport
361 .write_packet(&RuntimePacket::new(
362 header,
363 request.to_bytes()?.to_vec(),
364 Vec::new(),
365 )?)
366 .await?;
367
368 let ack_packet = self.transport.read_packet().await?;
369 if ack_packet.header.message_type != MessageType::SessionMigrateAck {
370 return Err(RuntimeError::UnexpectedMessage(
371 "client expected SESSION_MIGRATE_ACK",
372 ));
373 }
374 self.require_session_packet(
375 &ack_packet,
376 "client received migrate ack for another session",
377 )?;
378 if ack_packet.metadata.len() != SESSION_MIGRATE_ACK_METADATA_LEN {
379 return Err(RuntimeError::UnexpectedMessage(
380 "client received malformed SESSION_MIGRATE_ACK metadata length",
381 ));
382 }
383 let ack = SessionMigrateAckMetadata::parse(&ack_packet.metadata)?;
384 nnrp_core::validate_migration_recovery(&request, &ack)?;
385 Ok(ack)
386 }
387
388 pub fn build_migration_request(
389 &self,
390 new_transport_id: TransportId,
391 last_result_frame_id: u64,
392 client_migrate_ts_us: u64,
393 ) -> SessionMigrateMetadata {
394 SessionMigrateMetadata {
395 old_transport_id: self.transport.transport_kind().transport_id(),
396 new_transport_id,
397 last_result_frame_id,
398 client_migrate_ts_us,
399 }
400 }
401
402 pub async fn close(mut self) -> Result<(), RuntimeError> {
403 let close = SessionCloseMetadata {
404 close_reason: SessionCloseReason::ClientShutdown,
405 in_flight_policy: InFlightPolicy::Drain,
406 drain_timeout_ms: 0,
407 last_operation_id: self.next_frame_id.saturating_sub(1) as u64,
408 session_error_code: SESSION_ERROR_NONE,
409 session_close_tag: self.session_id,
410 };
411 self.close_with(close).await?;
412 self.transport.close().await
413 }
414
415 pub async fn close_with(
416 &mut self,
417 close: SessionCloseMetadata,
418 ) -> Result<SessionCloseAckMetadata, RuntimeError> {
419 let mut header = CommonHeader::new(
420 MessageType::SessionClose,
421 SESSION_CLOSE_METADATA_LEN as u32,
422 0,
423 );
424 header.session_id = self.session_id;
425 self.lifecycle.begin_session_close(&header, &close)?;
426 self.transport
427 .write_packet(&RuntimePacket::new(
428 header,
429 close.to_bytes()?.to_vec(),
430 Vec::new(),
431 )?)
432 .await?;
433
434 let ack_packet = self.transport.read_packet().await?;
435 if ack_packet.header.message_type != MessageType::SessionCloseAck {
436 return Err(RuntimeError::UnexpectedMessage(
437 "client expected SESSION_CLOSE_ACK",
438 ));
439 }
440 if ack_packet.header.session_id != self.session_id {
441 return Err(RuntimeError::UnexpectedMessage(
442 "client received close ack for another session",
443 ));
444 }
445 if ack_packet.metadata.len() != SESSION_CLOSE_ACK_METADATA_LEN {
446 return Err(RuntimeError::UnexpectedMessage(
447 "client received malformed SESSION_CLOSE_ACK metadata length",
448 ));
449 }
450
451 let ack = SessionCloseAckMetadata::parse(&ack_packet.metadata)?;
452 self.lifecycle
453 .apply_session_close_ack(&ack_packet.header, &ack)?;
454 Ok(ack)
455 }
456
457 pub async fn close_transport(mut self) -> Result<(), RuntimeError> {
458 self.transport.close().await
459 }
460
461 fn require_session_packet(
462 &self,
463 packet: &RuntimePacket,
464 message: &'static str,
465 ) -> Result<(), RuntimeError> {
466 if packet.header.session_id != self.session_id {
467 return Err(RuntimeError::UnexpectedMessage(message));
468 }
469 Ok(())
470 }
471}
472
473impl fmt::Debug for NnrpClient {
474 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
475 formatter
476 .debug_struct("NnrpClient")
477 .field("transport", &self.transport.transport_kind())
478 .field("config", &self.config)
479 .field("lifecycle", &self.lifecycle)
480 .finish()
481 }
482}
483
484impl fmt::Debug for NnrpClientSession {
485 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
486 formatter
487 .debug_struct("NnrpClientSession")
488 .field("session_id", &self.session_id)
489 .field("next_frame_id", &self.next_frame_id)
490 .field("transport", &self.transport.transport_kind())
491 .field("lifecycle", &self.lifecycle)
492 .finish()
493 }
494}