Skip to main content

doublecrypt_core/
network_store.rs

1//! Network-backed block store that connects to a `doublecrypt-server` over TLS.
2//!
3//! Uses the 4-byte little-endian length-prefixed protobuf protocol defined in
4//! `proto/blockstore.proto`.  The connection is synchronous (matching the
5//! [`BlockStore`] trait) but supports:
6//!
7//! * **Request pipelining** — [`read_blocks`](BlockStore::read_blocks) and
8//!   [`write_blocks`](BlockStore::write_blocks) send a full batch of requests
9//!   before reading any responses, eliminating per-block round-trip latency.
10//! * **Automatic reconnection** — a single retry on I/O failure with a fresh
11//!   TLS handshake (re-authenticates automatically).
12//! * **Configurable timeouts** — connect, read, and write deadlines.
13//! * **Key-derived authentication** — after the TLS handshake, the client sends
14//!   an `Authenticate` request containing a token derived from the master key
15//!   via HKDF (see [`derive_auth_token`](crate::crypto::derive_auth_token)).
16//!   This proves possession of the encryption key without revealing it.
17//!
18//! # Quick start
19//!
20//! ```no_run
21//! use std::path::Path;
22//! use doublecrypt_core::network_store::NetworkBlockStore;
23//! use doublecrypt_core::block_store::BlockStore;
24//!
25//! let master_key = [0u8; 32];
26//! let store = NetworkBlockStore::connect(
27//!     "127.0.0.1:9100",
28//!     "localhost",
29//!     Path::new("certs/ca.pem"),
30//!     &master_key,
31//! ).expect("connect to server");
32//!
33//! let data = store.read_block(0).expect("read block 0");
34//! ```
35//!
36//! # Builder
37//!
38//! ```no_run
39//! use std::time::Duration;
40//! use doublecrypt_core::network_store::{NetworkBlockStore, NetworkBlockStoreConfig};
41//! use doublecrypt_core::block_store::BlockStore;
42//!
43//! let master_key = [0u8; 32];
44//! let store = NetworkBlockStore::from_config(
45//!     NetworkBlockStoreConfig::new("10.0.0.5:9100", "block-server")
46//!         .ca_cert("certs/ca.pem")
47//!         .auth_token(&master_key)
48//!         .connect_timeout(Duration::from_secs(5))
49//!         .io_timeout(Duration::from_secs(60)),
50//! ).expect("connect to server");
51//! ```
52
53use std::io::{BufReader, Read, Write};
54use std::net::{TcpStream, ToSocketAddrs};
55use std::path::{Path, PathBuf};
56use std::sync::atomic::{AtomicU64, Ordering};
57use std::sync::{Arc, Mutex};
58use std::time::Duration;
59
60use prost::Message;
61use rustls::pki_types::{CertificateDer, ServerName};
62use rustls::{ClientConfig, ClientConnection, StreamOwned};
63
64use crate::block_store::BlockStore;
65use crate::crypto;
66use crate::error::{FsError, FsResult};
67use crate::proto;
68
69/// Maximum number of requests to pipeline before reading responses.
70///
71/// Keeps TCP buffer usage bounded and avoids deadlocks when the kernel
72/// send/receive buffers are smaller than the total pipelined payload.
73const PIPELINE_BATCH: usize = 64;
74
75// ── Configuration ───────────────────────────────────────────
76
77/// Connection parameters for a [`NetworkBlockStore`].
78pub struct NetworkBlockStoreConfig {
79    addr: String,
80    server_name: String,
81    ca_cert: PathBuf,
82    auth_token: [u8; 32],
83    connect_timeout: Duration,
84    io_timeout: Duration,
85}
86
87impl NetworkBlockStoreConfig {
88    /// Create a config targeting `addr` (`"host:port"`) with the given TLS
89    /// server name (SNI).  Timeouts default to 10 s (connect) and 30 s (I/O).
90    pub fn new(addr: impl Into<String>, server_name: impl Into<String>) -> Self {
91        Self {
92            addr: addr.into(),
93            server_name: server_name.into(),
94            ca_cert: PathBuf::new(),
95            auth_token: [0u8; 32],
96            connect_timeout: Duration::from_secs(10),
97            io_timeout: Duration::from_secs(30),
98        }
99    }
100
101    pub fn ca_cert(mut self, path: impl Into<PathBuf>) -> Self {
102        self.ca_cert = path.into();
103        self
104    }
105
106    /// Set the auth token by deriving it from the given master key.
107    pub fn auth_token(mut self, master_key: &[u8]) -> Self {
108        self.auth_token = crypto::derive_auth_token(master_key)
109            .expect("HKDF auth-token derivation should not fail with valid key material");
110        self
111    }
112
113    /// Set a pre-derived auth token directly.
114    pub fn auth_token_raw(mut self, token: [u8; 32]) -> Self {
115        self.auth_token = token;
116        self
117    }
118
119    pub fn connect_timeout(mut self, d: Duration) -> Self {
120        self.connect_timeout = d;
121        self
122    }
123
124    pub fn io_timeout(mut self, d: Duration) -> Self {
125        self.io_timeout = d;
126        self
127    }
128}
129
130// ── Store ───────────────────────────────────────────────────
131
132/// A [`BlockStore`] backed by a remote `doublecrypt-server` reached over TLS
133/// with key-derived authentication.
134///
135/// On construction the client performs a TLS handshake, issues a `GetInfo`
136/// RPC to learn the block size and total block count, then sends an
137/// `Authenticate` request with a token derived from the master key.
138/// The connection is stored and reused; if it breaks, one automatic
139/// reconnect (including re-authentication) is attempted.
140pub struct NetworkBlockStore {
141    config: NetworkBlockStoreConfig,
142    tls_config: Arc<ClientConfig>,
143    stream: Mutex<Option<StreamOwned<ClientConnection, TcpStream>>>,
144    block_size: usize,
145    total_blocks: u64,
146    next_request_id: AtomicU64,
147}
148
149impl NetworkBlockStore {
150    /// Connect to a `doublecrypt-server` using TLS with key-derived
151    /// authentication (convenience wrapper around [`from_config`](Self::from_config)).
152    pub fn connect(
153        addr: &str,
154        server_name: &str,
155        ca_cert: &Path,
156        master_key: &[u8],
157    ) -> FsResult<Self> {
158        Self::from_config(
159            NetworkBlockStoreConfig::new(addr, server_name)
160                .ca_cert(ca_cert)
161                .auth_token(master_key),
162        )
163    }
164
165    /// Connect using a [`NetworkBlockStoreConfig`].
166    pub fn from_config(config: NetworkBlockStoreConfig) -> FsResult<Self> {
167        let tls_config = build_client_tls_config(&config.ca_cert)?;
168
169        // Establish the initial connection.
170        let mut stream = establish_connection(&config, &tls_config)?;
171
172        // Authenticate with the key-derived token.
173        authenticate(&mut stream, &config.auth_token)?;
174
175        // Issue GetInfo to learn block geometry.
176        let req = proto::Request {
177            request_id: 2,
178            command: Some(proto::request::Command::GetInfo(proto::GetInfoRequest {})),
179        };
180        send_message(&mut stream, &req)?;
181        let resp = recv_message(&mut stream)?;
182
183        let (block_size, total_blocks) = match resp.result {
184            Some(proto::response::Result::GetInfo(info)) => {
185                (info.block_size as usize, info.total_blocks)
186            }
187            Some(proto::response::Result::Error(e)) => {
188                return Err(FsError::Internal(format!(
189                    "server error on GetInfo: {}",
190                    e.message
191                )))
192            }
193            _ => return Err(FsError::Internal("unexpected response to GetInfo".into())),
194        };
195
196        Ok(Self {
197            config,
198            tls_config,
199            stream: Mutex::new(Some(stream)),
200            block_size,
201            total_blocks,
202            next_request_id: AtomicU64::new(3),
203        })
204    }
205
206    /// Allocate a monotonically increasing request ID.
207    fn next_id(&self) -> u64 {
208        self.next_request_id.fetch_add(1, Ordering::Relaxed)
209    }
210
211    /// Establish a fresh TLS connection using stored config, including
212    /// re-authentication.
213    fn reconnect(&self) -> FsResult<StreamOwned<ClientConnection, TcpStream>> {
214        let mut stream = establish_connection(&self.config, &self.tls_config)?;
215        authenticate(&mut stream, &self.config.auth_token)?;
216        Ok(stream)
217    }
218
219    /// Send a single request and receive its response, retrying once on I/O
220    /// failure by reconnecting.
221    fn roundtrip(&self, req: &proto::Request) -> FsResult<proto::Response> {
222        let mut guard = self
223            .stream
224            .lock()
225            .map_err(|e| FsError::Internal(e.to_string()))?;
226
227        // Ensure we have a live connection.
228        if guard.is_none() {
229            *guard = Some(self.reconnect()?);
230        }
231
232        let stream = guard.as_mut().unwrap();
233        match send_and_recv(stream, req) {
234            Ok(resp) => Ok(resp),
235            Err(_) => {
236                // Connection may be dead — reconnect and retry once.
237                *guard = Some(self.reconnect()?);
238                send_and_recv(guard.as_mut().unwrap(), req)
239            }
240        }
241    }
242
243    // ── Pipelined helpers ───────────────────────────────────
244
245    /// Pipeline a batch of read requests on `stream`.
246    fn pipeline_reads(
247        &self,
248        stream: &mut StreamOwned<ClientConnection, TcpStream>,
249        block_ids: &[u64],
250    ) -> FsResult<Vec<Vec<u8>>> {
251        let mut results = Vec::with_capacity(block_ids.len());
252
253        for chunk in block_ids.chunks(PIPELINE_BATCH) {
254            // Send all requests in this batch.
255            for &block_id in chunk {
256                let id = self.next_id();
257                send_message(
258                    stream,
259                    &proto::Request {
260                        request_id: id,
261                        command: Some(proto::request::Command::ReadBlock(
262                            proto::ReadBlockRequest { block_id },
263                        )),
264                    },
265                )?;
266            }
267
268            // Read all responses.
269            for _ in chunk {
270                let resp = recv_message(stream)?;
271                match resp.result {
272                    Some(proto::response::Result::ReadBlock(r)) => results.push(r.data),
273                    Some(proto::response::Result::Error(e)) => {
274                        return Err(FsError::Internal(format!("server: {}", e.message)));
275                    }
276                    _ => return Err(FsError::Internal("unexpected response".into())),
277                }
278            }
279        }
280
281        Ok(results)
282    }
283
284    /// Pipeline a batch of write requests on `stream`.
285    fn pipeline_writes(
286        &self,
287        stream: &mut StreamOwned<ClientConnection, TcpStream>,
288        blocks: &[(u64, &[u8])],
289    ) -> FsResult<()> {
290        for chunk in blocks.chunks(PIPELINE_BATCH) {
291            for &(block_id, data) in chunk {
292                let id = self.next_id();
293                send_message(
294                    stream,
295                    &proto::Request {
296                        request_id: id,
297                        command: Some(proto::request::Command::WriteBlock(
298                            proto::WriteBlockRequest {
299                                block_id,
300                                data: data.to_vec(),
301                            },
302                        )),
303                    },
304                )?;
305            }
306
307            for _ in chunk {
308                let resp = recv_message(stream)?;
309                match resp.result {
310                    Some(proto::response::Result::WriteBlock(_)) => {}
311                    Some(proto::response::Result::Error(e)) => {
312                        return Err(FsError::Internal(format!("server: {}", e.message)));
313                    }
314                    _ => return Err(FsError::Internal("unexpected response".into())),
315                }
316            }
317        }
318
319        Ok(())
320    }
321
322    /// Run a pipelined operation with one reconnect attempt on failure.
323    fn with_pipeline<F, T>(&self, op: F) -> FsResult<T>
324    where
325        F: Fn(&Self, &mut StreamOwned<ClientConnection, TcpStream>) -> FsResult<T>,
326    {
327        let mut guard = self
328            .stream
329            .lock()
330            .map_err(|e| FsError::Internal(e.to_string()))?;
331
332        if guard.is_none() {
333            *guard = Some(self.reconnect()?);
334        }
335
336        match op(self, guard.as_mut().unwrap()) {
337            Ok(v) => Ok(v),
338            Err(_) => {
339                *guard = Some(self.reconnect()?);
340                op(self, guard.as_mut().unwrap())
341            }
342        }
343    }
344}
345
346impl BlockStore for NetworkBlockStore {
347    fn block_size(&self) -> usize {
348        self.block_size
349    }
350
351    fn total_blocks(&self) -> u64 {
352        self.total_blocks
353    }
354
355    fn read_block(&self, block_id: u64) -> FsResult<Vec<u8>> {
356        let id = self.next_id();
357        let req = proto::Request {
358            request_id: id,
359            command: Some(proto::request::Command::ReadBlock(
360                proto::ReadBlockRequest { block_id },
361            )),
362        };
363        let resp = self.roundtrip(&req)?;
364
365        match resp.result {
366            Some(proto::response::Result::ReadBlock(r)) => Ok(r.data),
367            Some(proto::response::Result::Error(e)) => {
368                Err(FsError::Internal(format!("server: {}", e.message)))
369            }
370            _ => Err(FsError::Internal("unexpected response".into())),
371        }
372    }
373
374    fn write_block(&self, block_id: u64, data: &[u8]) -> FsResult<()> {
375        let id = self.next_id();
376        let req = proto::Request {
377            request_id: id,
378            command: Some(proto::request::Command::WriteBlock(
379                proto::WriteBlockRequest {
380                    block_id,
381                    data: data.to_vec(),
382                },
383            )),
384        };
385        let resp = self.roundtrip(&req)?;
386
387        match resp.result {
388            Some(proto::response::Result::WriteBlock(_)) => Ok(()),
389            Some(proto::response::Result::Error(e)) => {
390                Err(FsError::Internal(format!("server: {}", e.message)))
391            }
392            _ => Err(FsError::Internal("unexpected response".into())),
393        }
394    }
395
396    fn sync(&self) -> FsResult<()> {
397        let id = self.next_id();
398        let req = proto::Request {
399            request_id: id,
400            command: Some(proto::request::Command::Sync(proto::SyncRequest {})),
401        };
402        let resp = self.roundtrip(&req)?;
403
404        match resp.result {
405            Some(proto::response::Result::Sync(_)) => Ok(()),
406            Some(proto::response::Result::Error(e)) => {
407                Err(FsError::Internal(format!("server: {}", e.message)))
408            }
409            _ => Err(FsError::Internal("unexpected response".into())),
410        }
411    }
412
413    fn read_blocks(&self, block_ids: &[u64]) -> FsResult<Vec<Vec<u8>>> {
414        if block_ids.is_empty() {
415            return Ok(Vec::new());
416        }
417        self.with_pipeline(|s, stream| s.pipeline_reads(stream, block_ids))
418    }
419
420    fn write_blocks(&self, blocks: &[(u64, &[u8])]) -> FsResult<()> {
421        if blocks.is_empty() {
422            return Ok(());
423        }
424        self.with_pipeline(|s, stream| s.pipeline_writes(stream, blocks))
425    }
426}
427
428// ── Authentication ──────────────────────────────────────────
429
430/// Send an Authenticate request and verify the server accepts it.
431fn authenticate(
432    stream: &mut StreamOwned<ClientConnection, TcpStream>,
433    auth_token: &[u8; 32],
434) -> FsResult<()> {
435    let req = proto::Request {
436        request_id: 2,
437        command: Some(proto::request::Command::Authenticate(
438            proto::AuthenticateRequest {
439                auth_token: auth_token.to_vec(),
440            },
441        )),
442    };
443    send_message(stream, &req)?;
444    let resp = recv_message(stream)?;
445
446    match resp.result {
447        Some(proto::response::Result::Authenticate(_)) => Ok(()),
448        Some(proto::response::Result::Error(e)) => Err(FsError::Internal(format!(
449            "authentication failed: {}",
450            e.message
451        ))),
452        _ => Err(FsError::Internal(
453            "unexpected response to Authenticate".into(),
454        )),
455    }
456}
457
458// ── Wire helpers ────────────────────────────────────────────
459
460fn send_message<W: Write>(w: &mut W, msg: &proto::Request) -> FsResult<()> {
461    let payload = msg.encode_to_vec();
462    let len = payload.len() as u32;
463    w.write_all(&len.to_le_bytes())
464        .map_err(|e| FsError::Internal(format!("write length prefix: {e}")))?;
465    w.write_all(&payload)
466        .map_err(|e| FsError::Internal(format!("write payload: {e}")))?;
467    w.flush()
468        .map_err(|e| FsError::Internal(format!("flush: {e}")))?;
469    Ok(())
470}
471
472fn recv_message<R: Read>(r: &mut R) -> FsResult<proto::Response> {
473    let mut len_buf = [0u8; 4];
474    r.read_exact(&mut len_buf)
475        .map_err(|e| FsError::Internal(format!("read length prefix: {e}")))?;
476    let len = u32::from_le_bytes(len_buf) as usize;
477
478    if len > 16 * 1024 * 1024 {
479        return Err(FsError::Internal(format!(
480            "response too large: {len} bytes"
481        )));
482    }
483
484    let mut buf = vec![0u8; len];
485    r.read_exact(&mut buf)
486        .map_err(|e| FsError::Internal(format!("read payload: {e}")))?;
487
488    proto::Response::decode(&*buf).map_err(|e| FsError::Internal(format!("decode response: {e}")))
489}
490
491fn send_and_recv(
492    stream: &mut StreamOwned<ClientConnection, TcpStream>,
493    req: &proto::Request,
494) -> FsResult<proto::Response> {
495    send_message(stream, req)?;
496    recv_message(stream)
497}
498
499// ── Connection establishment ────────────────────────────────
500
501fn establish_connection(
502    config: &NetworkBlockStoreConfig,
503    tls_config: &Arc<ClientConfig>,
504) -> FsResult<StreamOwned<ClientConnection, TcpStream>> {
505    let addr = config
506        .addr
507        .to_socket_addrs()
508        .map_err(|e| FsError::Internal(format!("resolve {}: {e}", config.addr)))?
509        .next()
510        .ok_or_else(|| FsError::Internal(format!("no addresses for {}", config.addr)))?;
511
512    let tcp = TcpStream::connect_timeout(&addr, config.connect_timeout)
513        .map_err(|e| FsError::Internal(format!("connect to {}: {e}", config.addr)))?;
514
515    tcp.set_read_timeout(Some(config.io_timeout))
516        .map_err(|e| FsError::Internal(format!("set read timeout: {e}")))?;
517    tcp.set_write_timeout(Some(config.io_timeout))
518        .map_err(|e| FsError::Internal(format!("set write timeout: {e}")))?;
519
520    let sni = ServerName::try_from(config.server_name.clone())
521        .map_err(|e| FsError::Internal(format!("invalid SNI '{}': {e}", config.server_name)))?;
522
523    let tls_conn = ClientConnection::new(Arc::clone(tls_config), sni)
524        .map_err(|e| FsError::Internal(format!("TLS connection: {e}")))?;
525
526    Ok(StreamOwned::new(tls_conn, tcp))
527}
528
529// ── TLS configuration ───────────────────────────────────────
530
531fn build_client_tls_config(ca_path: &Path) -> FsResult<Arc<ClientConfig>> {
532    let ca_pem = std::fs::read(ca_path)
533        .map_err(|e| FsError::Internal(format!("read CA cert {}: {e}", ca_path.display())))?;
534    let ca_certs: Vec<CertificateDer<'static>> =
535        rustls_pemfile::certs(&mut BufReader::new(&*ca_pem))
536            .collect::<std::result::Result<Vec<_>, _>>()
537            .map_err(|e| FsError::Internal(format!("parse CA certs: {e}")))?;
538
539    let mut root_store = rustls::RootCertStore::empty();
540    for cert in ca_certs {
541        root_store
542            .add(cert)
543            .map_err(|e| FsError::Internal(format!("add CA cert: {e}")))?;
544    }
545
546    let config = ClientConfig::builder()
547        .with_root_certificates(root_store)
548        .with_no_client_auth();
549
550    Ok(Arc::new(config))
551}