Skip to main content

nnrp_runtime/
client.rs

1use std::fmt;
2
3use nnrp_core::{
4    validate_result_drop_header, CacheObjectKind, CommonHeader, ConnectionLifecycle,
5    FlowUpdateMetadata, FrameSubmitMetadata, InFlightPolicy, MessageType, ResultPushMetadata,
6    SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseReason, SessionMigrateAckMetadata,
7    SessionMigrateMetadata, SessionOpenAckMetadata, SessionOpenMetadata, SessionPatchAckMetadata,
8    SessionPatchMetadata, SessionPriorityClass, SessionStatus, TransportId,
9    FRAME_SUBMIT_METADATA_LEN, RESULT_PUSH_METADATA_LEN, SESSION_CLOSE_ACK_METADATA_LEN,
10    SESSION_CLOSE_METADATA_LEN, SESSION_ERROR_NONE, SESSION_MIGRATE_ACK_METADATA_LEN,
11    SESSION_MIGRATE_METADATA_LEN, SESSION_OPEN_METADATA_LEN, SESSION_PATCH_ACK_METADATA_LEN,
12    SESSION_PATCH_METADATA_LEN, STANDARD_PROFILE_TOKEN, TOKEN_DELTA_SCHEMA_ID,
13    TOKEN_DELTA_SCHEMA_VERSION,
14};
15
16use crate::{
17    BoxedFramedTransport, FramedTransport, RuntimeError, RuntimePacket, RuntimeTransportKind,
18    TcpTransport,
19};
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct NnrpClientConfig {
23    pub transport: RuntimeTransportKind,
24    pub requested_session_id: u32,
25    pub profile_id: u16,
26    pub schema_id: u32,
27    pub schema_version: u32,
28    pub priority_class: SessionPriorityClass,
29    pub default_deadline_ms: u32,
30    pub max_in_flight_operations: u16,
31    pub lease_ttl_hint_ms: u32,
32    pub allow_resume: bool,
33    pub resume_token_bytes: u32,
34    pub cache_hints: Vec<CacheObjectKind>,
35}
36
37impl Default for NnrpClientConfig {
38    fn default() -> Self {
39        Self {
40            transport: RuntimeTransportKind::Tcp,
41            requested_session_id: 1,
42            profile_id: STANDARD_PROFILE_TOKEN,
43            schema_id: TOKEN_DELTA_SCHEMA_ID,
44            schema_version: TOKEN_DELTA_SCHEMA_VERSION,
45            priority_class: SessionPriorityClass::Balanced,
46            default_deadline_ms: 500,
47            max_in_flight_operations: 4,
48            lease_ttl_hint_ms: 30_000,
49            allow_resume: false,
50            resume_token_bytes: 0,
51            cache_hints: Vec::new(),
52        }
53    }
54}
55
56impl NnrpClientConfig {
57    pub fn with_transport(mut self, transport: RuntimeTransportKind) -> Self {
58        self.transport = transport;
59        self
60    }
61
62    pub fn with_cache_hints(mut self, cache_hints: impl Into<Vec<CacheObjectKind>>) -> Self {
63        self.cache_hints = cache_hints.into();
64        self
65    }
66
67    pub fn with_resume(mut self, resume_token_bytes: u32) -> Self {
68        self.allow_resume = true;
69        self.resume_token_bytes = resume_token_bytes;
70        self
71    }
72}
73
74pub struct NnrpClient {
75    transport: BoxedFramedTransport,
76    config: NnrpClientConfig,
77    lifecycle: ConnectionLifecycle,
78}
79
80pub struct NnrpClientSession {
81    session_id: u32,
82    next_frame_id: u32,
83    transport: BoxedFramedTransport,
84    lifecycle: ConnectionLifecycle,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct NnrpResult {
89    pub frame_id: u32,
90    pub metadata: ResultPushMetadata,
91    pub body: Vec<u8>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum NnrpClientEvent {
96    Result(NnrpResult),
97    ResultDrop { frame_id: u32 },
98    FlowUpdate(FlowUpdateMetadata),
99}
100
101impl NnrpClient {
102    pub async fn connect_tcp(
103        addr: impl tokio::net::ToSocketAddrs,
104        config: NnrpClientConfig,
105    ) -> Result<Self, RuntimeError> {
106        if config.transport != RuntimeTransportKind::Tcp {
107            return Err(RuntimeError::UnsupportedTransport(
108                "client config selected a non-TCP transport for connect_tcp",
109            ));
110        }
111        Self::from_transport(TcpTransport::connect(addr).await?, config)
112    }
113
114    pub async fn connect_quic(
115        _endpoint: &str,
116        config: NnrpClientConfig,
117    ) -> Result<Self, RuntimeError> {
118        if config.transport != RuntimeTransportKind::Quic {
119            return Err(RuntimeError::UnsupportedTransport(
120                "client config selected a non-QUIC transport for connect_quic",
121            ));
122        }
123        Err(RuntimeError::UnsupportedTransport(
124            "QUIC provider is not installed; use from_transport with a QUIC FramedTransport",
125        ))
126    }
127
128    pub fn from_transport<T>(transport: T, config: NnrpClientConfig) -> Result<Self, RuntimeError>
129    where
130        T: FramedTransport + 'static,
131    {
132        Self::from_boxed_transport(Box::new(transport), config)
133    }
134
135    pub fn from_boxed_transport(
136        transport: BoxedFramedTransport,
137        config: NnrpClientConfig,
138    ) -> Result<Self, RuntimeError> {
139        if transport.transport_kind() != config.transport {
140            return Err(RuntimeError::UnsupportedTransport(
141                "client config transport does not match the provided transport slot",
142            ));
143        }
144        Ok(Self {
145            transport,
146            config,
147            lifecycle: ConnectionLifecycle::new(),
148        })
149    }
150
151    pub async fn open_session(mut self) -> Result<NnrpClientSession, RuntimeError> {
152        let metadata = self.session_open_metadata();
153        let mut metadata_bytes = vec![0u8; SESSION_OPEN_METADATA_LEN];
154        metadata.write(&mut metadata_bytes)?;
155
156        let header = CommonHeader::new(
157            MessageType::SessionOpen,
158            SESSION_OPEN_METADATA_LEN as u32,
159            0,
160        );
161        self.transport
162            .write_packet(&RuntimePacket::new(header, metadata_bytes, Vec::new())?)
163            .await?;
164
165        let ack_packet = self.transport.read_packet().await?;
166        if ack_packet.header.message_type != MessageType::SessionOpenAck {
167            return Err(RuntimeError::UnexpectedMessage(
168                "client expected SESSION_OPEN_ACK",
169            ));
170        }
171
172        let ack = SessionOpenAckMetadata::parse(&ack_packet.metadata)?;
173        nnrp_core::validate_session_recovery_ack(&metadata, &ack)?;
174        if !matches!(
175            ack.session_status,
176            SessionStatus::Opened | SessionStatus::Resumed
177        ) {
178            return Err(RuntimeError::UnexpectedMessage(
179                "client session open was not accepted",
180            ));
181        }
182        self.lifecycle.apply_session_open_ack(&ack)?;
183
184        Ok(NnrpClientSession {
185            session_id: ack.session_id,
186            next_frame_id: 1,
187            transport: self.transport,
188            lifecycle: self.lifecycle,
189        })
190    }
191
192    fn session_open_metadata(&self) -> SessionOpenMetadata {
193        SessionOpenMetadata {
194            requested_session_id: self.config.requested_session_id,
195            profile_id: self.config.profile_id,
196            priority_class: self.config.priority_class,
197            session_flags: if self.config.allow_resume {
198                nnrp_core::SESSION_FLAG_ALLOW_RESUME
199            } else {
200                0
201            },
202            schema_id: self.config.schema_id,
203            schema_version: self.config.schema_version,
204            default_deadline_ms: self.config.default_deadline_ms,
205            max_in_flight_operations: self.config.max_in_flight_operations,
206            lease_ttl_hint_ms: self.config.lease_ttl_hint_ms,
207            resume_token_bytes: self.config.resume_token_bytes,
208            auth_bytes: 0,
209            session_extension_bytes: 0,
210            client_session_tag: self.config.requested_session_id as u64,
211        }
212    }
213}
214
215impl NnrpClientSession {
216    pub fn session_id(&self) -> u32 {
217        self.session_id
218    }
219
220    pub fn lifecycle(&self) -> &ConnectionLifecycle {
221        &self.lifecycle
222    }
223
224    pub async fn submit(
225        &mut self,
226        metadata: FrameSubmitMetadata,
227        body: Vec<u8>,
228    ) -> Result<u32, RuntimeError> {
229        self.submit_nowait(metadata, body).await
230    }
231
232    pub async fn submit_nowait(
233        &mut self,
234        metadata: FrameSubmitMetadata,
235        body: Vec<u8>,
236    ) -> Result<u32, RuntimeError> {
237        let frame_id = self.next_frame_id;
238        self.next_frame_id = self
239            .next_frame_id
240            .checked_add(1)
241            .ok_or(RuntimeError::FrameIdOverflow)?;
242
243        let mut header = CommonHeader::new(
244            MessageType::FrameSubmit,
245            FRAME_SUBMIT_METADATA_LEN as u32,
246            body.len() as u32,
247        );
248        header.session_id = self.session_id;
249        header.frame_id = frame_id;
250
251        self.transport
252            .write_packet(&RuntimePacket::new(
253                header,
254                metadata.to_bytes()?.to_vec(),
255                body,
256            )?)
257            .await?;
258        Ok(frame_id)
259    }
260
261    pub async fn await_result(&mut self) -> Result<NnrpResult, RuntimeError> {
262        match self.await_event().await? {
263            NnrpClientEvent::Result(result) => Ok(result),
264            NnrpClientEvent::ResultDrop { .. } => Err(RuntimeError::UnexpectedMessage(
265                "client expected RESULT_PUSH but received RESULT_DROP",
266            )),
267            NnrpClientEvent::FlowUpdate(_) => Err(RuntimeError::UnexpectedMessage(
268                "client expected RESULT_PUSH but received FLOW_UPDATE",
269            )),
270        }
271    }
272
273    pub async fn await_event(&mut self) -> Result<NnrpClientEvent, RuntimeError> {
274        let packet = self.transport.read_packet().await?;
275        match packet.header.message_type {
276            MessageType::ResultPush => {
277                self.require_session_packet(&packet, "client received result for another session")?;
278                if packet.metadata.len() != RESULT_PUSH_METADATA_LEN {
279                    return Err(RuntimeError::UnexpectedMessage(
280                        "client received malformed RESULT_PUSH metadata length",
281                    ));
282                }
283                Ok(NnrpClientEvent::Result(NnrpResult {
284                    frame_id: packet.header.frame_id,
285                    metadata: ResultPushMetadata::parse(&packet.metadata)?,
286                    body: packet.body,
287                }))
288            }
289            MessageType::ResultDrop => {
290                self.require_session_packet(&packet, "client received drop for another session")?;
291                validate_result_drop_header(&packet.header)?;
292                Ok(NnrpClientEvent::ResultDrop {
293                    frame_id: packet.header.frame_id,
294                })
295            }
296            MessageType::FlowUpdate => {
297                let metadata = FlowUpdateMetadata::parse(&packet.metadata)?;
298                self.lifecycle
299                    .validate_flow_update(&packet.header, &metadata)?;
300                Ok(NnrpClientEvent::FlowUpdate(metadata))
301            }
302            _ => Err(RuntimeError::UnexpectedMessage(
303                "client expected RESULT_PUSH, RESULT_DROP, or FLOW_UPDATE",
304            )),
305        }
306    }
307
308    pub async fn cancel_frame(&mut self, frame_id: u32) -> Result<(), RuntimeError> {
309        let mut header = CommonHeader::new(MessageType::FrameCancel, 0, 0);
310        header.session_id = self.session_id;
311        header.frame_id = frame_id;
312        self.transport
313            .write_packet(&RuntimePacket::new(header, Vec::new(), Vec::new())?)
314            .await
315    }
316
317    pub async fn patch_session(
318        &mut self,
319        patch: SessionPatchMetadata,
320    ) -> Result<SessionPatchAckMetadata, RuntimeError> {
321        let mut header = CommonHeader::new(
322            MessageType::SessionPatch,
323            SESSION_PATCH_METADATA_LEN as u32,
324            patch.profile_patch_bytes,
325        );
326        header.session_id = self.session_id;
327        self.transport
328            .write_packet(&RuntimePacket::new(
329                header,
330                patch.to_bytes()?.to_vec(),
331                Vec::new(),
332            )?)
333            .await?;
334
335        let ack_packet = self.transport.read_packet().await?;
336        if ack_packet.header.message_type != MessageType::SessionPatchAck {
337            return Err(RuntimeError::UnexpectedMessage(
338                "client expected SESSION_PATCH_ACK",
339            ));
340        }
341        self.require_session_packet(&ack_packet, "client received patch ack for another session")?;
342        if ack_packet.metadata.len() != SESSION_PATCH_ACK_METADATA_LEN {
343            return Err(RuntimeError::UnexpectedMessage(
344                "client received malformed SESSION_PATCH_ACK metadata length",
345            ));
346        }
347        Ok(SessionPatchAckMetadata::parse(&ack_packet.metadata)?)
348    }
349
350    pub async fn migrate_transport(
351        &mut self,
352        request: SessionMigrateMetadata,
353    ) -> Result<SessionMigrateAckMetadata, RuntimeError> {
354        let mut header = CommonHeader::new(
355            MessageType::SessionMigrate,
356            SESSION_MIGRATE_METADATA_LEN as u32,
357            0,
358        );
359        header.session_id = self.session_id;
360        self.transport
361            .write_packet(&RuntimePacket::new(
362                header,
363                request.to_bytes()?.to_vec(),
364                Vec::new(),
365            )?)
366            .await?;
367
368        let ack_packet = self.transport.read_packet().await?;
369        if ack_packet.header.message_type != MessageType::SessionMigrateAck {
370            return Err(RuntimeError::UnexpectedMessage(
371                "client expected SESSION_MIGRATE_ACK",
372            ));
373        }
374        self.require_session_packet(
375            &ack_packet,
376            "client received migrate ack for another session",
377        )?;
378        if ack_packet.metadata.len() != SESSION_MIGRATE_ACK_METADATA_LEN {
379            return Err(RuntimeError::UnexpectedMessage(
380                "client received malformed SESSION_MIGRATE_ACK metadata length",
381            ));
382        }
383        let ack = SessionMigrateAckMetadata::parse(&ack_packet.metadata)?;
384        nnrp_core::validate_migration_recovery(&request, &ack)?;
385        Ok(ack)
386    }
387
388    pub fn build_migration_request(
389        &self,
390        new_transport_id: TransportId,
391        last_result_frame_id: u64,
392        client_migrate_ts_us: u64,
393    ) -> SessionMigrateMetadata {
394        SessionMigrateMetadata {
395            old_transport_id: self.transport.transport_kind().transport_id(),
396            new_transport_id,
397            last_result_frame_id,
398            client_migrate_ts_us,
399        }
400    }
401
402    pub async fn close(mut self) -> Result<(), RuntimeError> {
403        let close = SessionCloseMetadata {
404            close_reason: SessionCloseReason::ClientShutdown,
405            in_flight_policy: InFlightPolicy::Drain,
406            drain_timeout_ms: 0,
407            last_operation_id: self.next_frame_id.saturating_sub(1) as u64,
408            session_error_code: SESSION_ERROR_NONE,
409            session_close_tag: self.session_id,
410        };
411        self.close_with(close).await?;
412        self.transport.close().await
413    }
414
415    pub async fn close_with(
416        &mut self,
417        close: SessionCloseMetadata,
418    ) -> Result<SessionCloseAckMetadata, RuntimeError> {
419        let mut header = CommonHeader::new(
420            MessageType::SessionClose,
421            SESSION_CLOSE_METADATA_LEN as u32,
422            0,
423        );
424        header.session_id = self.session_id;
425        self.lifecycle.begin_session_close(&header, &close)?;
426        self.transport
427            .write_packet(&RuntimePacket::new(
428                header,
429                close.to_bytes()?.to_vec(),
430                Vec::new(),
431            )?)
432            .await?;
433
434        let ack_packet = self.transport.read_packet().await?;
435        if ack_packet.header.message_type != MessageType::SessionCloseAck {
436            return Err(RuntimeError::UnexpectedMessage(
437                "client expected SESSION_CLOSE_ACK",
438            ));
439        }
440        if ack_packet.header.session_id != self.session_id {
441            return Err(RuntimeError::UnexpectedMessage(
442                "client received close ack for another session",
443            ));
444        }
445        if ack_packet.metadata.len() != SESSION_CLOSE_ACK_METADATA_LEN {
446            return Err(RuntimeError::UnexpectedMessage(
447                "client received malformed SESSION_CLOSE_ACK metadata length",
448            ));
449        }
450
451        let ack = SessionCloseAckMetadata::parse(&ack_packet.metadata)?;
452        self.lifecycle
453            .apply_session_close_ack(&ack_packet.header, &ack)?;
454        Ok(ack)
455    }
456
457    pub async fn close_transport(mut self) -> Result<(), RuntimeError> {
458        self.transport.close().await
459    }
460
461    fn require_session_packet(
462        &self,
463        packet: &RuntimePacket,
464        message: &'static str,
465    ) -> Result<(), RuntimeError> {
466        if packet.header.session_id != self.session_id {
467            return Err(RuntimeError::UnexpectedMessage(message));
468        }
469        Ok(())
470    }
471}
472
473impl fmt::Debug for NnrpClient {
474    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
475        formatter
476            .debug_struct("NnrpClient")
477            .field("transport", &self.transport.transport_kind())
478            .field("config", &self.config)
479            .field("lifecycle", &self.lifecycle)
480            .finish()
481    }
482}
483
484impl fmt::Debug for NnrpClientSession {
485    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
486        formatter
487            .debug_struct("NnrpClientSession")
488            .field("session_id", &self.session_id)
489            .field("next_frame_id", &self.next_frame_id)
490            .field("transport", &self.transport.transport_kind())
491            .field("lifecycle", &self.lifecycle)
492            .finish()
493    }
494}