Skip to main content

hyperdb_api_core/client/
sync_stream.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Sync stream abstraction for multiple transport types.
5//!
6//! This module provides [`SyncStream`], an enum that can hold different
7//! sync stream types (TCP, Unix Domain Socket) while implementing the
8//! necessary I/O traits.
9
10use std::io::{self, Read, Write};
11use std::net::TcpStream;
12
13#[cfg(unix)]
14use std::os::unix::net::UnixStream;
15
16#[cfg(windows)]
17use std::fs::File;
18
19/// A sync stream that can be either TCP or Unix Domain Socket.
20///
21/// This enum provides a unified interface for different transport mechanisms,
22/// allowing [`Client`](crate::client::Client) to work with both TCP and
23/// Unix Domain Sockets transparently.
24#[derive(Debug)]
25pub enum SyncStream {
26    /// TCP stream for network connections.
27    Tcp(TcpStream),
28
29    /// Unix Domain Socket stream for local IPC (Unix only).
30    #[cfg(unix)]
31    Unix(UnixStream),
32
33    /// Windows Named Pipe stream for local IPC (Windows only).
34    #[cfg(windows)]
35    NamedPipe(File),
36}
37
38impl SyncStream {
39    /// Creates a new TCP stream wrapper.
40    #[must_use]
41    pub fn tcp(stream: TcpStream) -> Self {
42        SyncStream::Tcp(stream)
43    }
44
45    /// Creates a new Unix Domain Socket stream wrapper.
46    #[cfg(unix)]
47    #[must_use]
48    pub fn unix(stream: UnixStream) -> Self {
49        SyncStream::Unix(stream)
50    }
51
52    /// Returns true if this is a TCP stream.
53    #[must_use]
54    pub fn is_tcp(&self) -> bool {
55        matches!(self, SyncStream::Tcp(_))
56    }
57
58    /// Returns true if this is a Unix Domain Socket stream.
59    #[cfg(unix)]
60    #[must_use]
61    pub fn is_unix(&self) -> bool {
62        matches!(self, SyncStream::Unix(_))
63    }
64
65    /// Creates a new Windows Named Pipe stream wrapper.
66    #[cfg(windows)]
67    pub fn named_pipe(file: File) -> Self {
68        SyncStream::NamedPipe(file)
69    }
70
71    /// Returns true if this is a Windows Named Pipe stream.
72    #[cfg(windows)]
73    pub fn is_named_pipe(&self) -> bool {
74        matches!(self, SyncStream::NamedPipe(_))
75    }
76
77    /// Sets `TCP_NODELAY` option (only applicable for TCP streams).
78    ///
79    /// # Errors
80    ///
81    /// Returns an [`io::Error`] from the underlying
82    /// [`std::net::TcpStream::set_nodelay`] when the socket option
83    /// cannot be applied. Unix-domain and named-pipe variants are
84    /// no-ops that always return `Ok(())`.
85    pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
86        match self {
87            SyncStream::Tcp(stream) => stream.set_nodelay(nodelay),
88            #[cfg(unix)]
89            SyncStream::Unix(_) => Ok(()), // No-op for Unix sockets
90            #[cfg(windows)]
91            SyncStream::NamedPipe(_) => Ok(()), // No-op for Named Pipes
92        }
93    }
94
95    /// Sets read timeout.
96    ///
97    /// # Errors
98    ///
99    /// Returns an [`io::Error`] from the underlying transport's
100    /// `set_read_timeout` call. On Windows, the named-pipe variant is
101    /// a no-op that always returns `Ok(())`.
102    pub fn set_read_timeout(&self, dur: Option<std::time::Duration>) -> io::Result<()> {
103        match self {
104            SyncStream::Tcp(stream) => stream.set_read_timeout(dur),
105            #[cfg(unix)]
106            SyncStream::Unix(stream) => stream.set_read_timeout(dur),
107            #[cfg(windows)]
108            SyncStream::NamedPipe(_) => Ok(()), // Named pipes don't support timeouts directly
109        }
110    }
111
112    /// Sets write timeout.
113    ///
114    /// # Errors
115    ///
116    /// Returns an [`io::Error`] from the underlying transport's
117    /// `set_write_timeout` call. On Windows, the named-pipe variant
118    /// is a no-op that always returns `Ok(())`.
119    pub fn set_write_timeout(&self, dur: Option<std::time::Duration>) -> io::Result<()> {
120        match self {
121            SyncStream::Tcp(stream) => stream.set_write_timeout(dur),
122            #[cfg(unix)]
123            SyncStream::Unix(stream) => stream.set_write_timeout(dur),
124            #[cfg(windows)]
125            SyncStream::NamedPipe(_) => Ok(()), // Named pipes don't support timeouts directly
126        }
127    }
128
129    /// Returns the local address for TCP streams, or a placeholder for Unix sockets.
130    #[must_use]
131    pub fn local_addr_string(&self) -> String {
132        match self {
133            SyncStream::Tcp(stream) => stream
134                .local_addr()
135                .map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
136            #[cfg(unix)]
137            SyncStream::Unix(stream) => stream
138                .local_addr()
139                .ok()
140                .and_then(|a| a.as_pathname().map(|p| p.display().to_string()))
141                .unwrap_or_else(|| "unix-socket".to_string()),
142            #[cfg(windows)]
143            SyncStream::NamedPipe(_) => "named-pipe".to_string(),
144        }
145    }
146
147    /// Returns the peer address for TCP streams, or a placeholder for Unix sockets.
148    #[must_use]
149    pub fn peer_addr_string(&self) -> String {
150        match self {
151            SyncStream::Tcp(stream) => stream
152                .peer_addr()
153                .map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
154            #[cfg(unix)]
155            SyncStream::Unix(stream) => stream
156                .peer_addr()
157                .ok()
158                .and_then(|a| a.as_pathname().map(|p| p.display().to_string()))
159                .unwrap_or_else(|| "unix-socket".to_string()),
160            #[cfg(windows)]
161            SyncStream::NamedPipe(_) => "named-pipe".to_string(),
162        }
163    }
164
165    /// Attempts to clone the stream.
166    ///
167    /// # Errors
168    ///
169    /// Returns an [`io::Error`] from the underlying transport's
170    /// `try_clone` call — typically because the OS refused to
171    /// duplicate the descriptor.
172    pub fn try_clone(&self) -> io::Result<Self> {
173        match self {
174            SyncStream::Tcp(stream) => Ok(SyncStream::Tcp(stream.try_clone()?)),
175            #[cfg(unix)]
176            SyncStream::Unix(stream) => Ok(SyncStream::Unix(stream.try_clone()?)),
177            #[cfg(windows)]
178            SyncStream::NamedPipe(file) => Ok(SyncStream::NamedPipe(file.try_clone()?)),
179        }
180    }
181}
182
183impl Read for SyncStream {
184    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
185        match self {
186            SyncStream::Tcp(stream) => stream.read(buf),
187            #[cfg(unix)]
188            SyncStream::Unix(stream) => stream.read(buf),
189            #[cfg(windows)]
190            SyncStream::NamedPipe(file) => file.read(buf),
191        }
192    }
193}
194
195impl Write for SyncStream {
196    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
197        match self {
198            SyncStream::Tcp(stream) => stream.write(buf),
199            #[cfg(unix)]
200            SyncStream::Unix(stream) => stream.write(buf),
201            #[cfg(windows)]
202            SyncStream::NamedPipe(file) => file.write(buf),
203        }
204    }
205
206    fn flush(&mut self) -> io::Result<()> {
207        match self {
208            SyncStream::Tcp(stream) => stream.flush(),
209            #[cfg(unix)]
210            SyncStream::Unix(stream) => stream.flush(),
211            #[cfg(windows)]
212            SyncStream::NamedPipe(file) => file.flush(),
213        }
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    #[expect(
220        clippy::assertions_on_constants,
221        reason = "compile-time invariant check kept as an assert for readability at the call site"
222    )]
223    #[test]
224    fn test_sync_stream_variants_exist() {
225        // We can't easily create streams without connecting,
226        // so we just verify the module compiles correctly
227        assert!(true);
228    }
229}