Skip to main content

heddle_client/grpc_hosted/
hydration.rs

1use std::{
2    net::ToSocketAddrs,
3    sync::{Arc, Mutex, OnceLock, mpsc},
4    thread,
5    time::Duration,
6};
7
8use objects::{
9    error::HeddleError,
10    object::{ChangeId, ContentHash, ThreadName},
11};
12use repo::{BlobHydrator, Repository};
13use wire::ProtocolError;
14
15use super::{HostedAuthMode, HostedGrpcClient, HostedSession};
16
17/// Default hosted lazy-hydration deadline.
18///
19/// This matches the hosted client config's 30s default connection timeout and
20/// gives lazy reads a bounded failure mode when a gRPC request stalls without a
21/// transport-level TCP timeout.
22const DEFAULT_HOSTED_HYDRATION_TIMEOUT: Duration = Duration::from_secs(30);
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub enum PullMaterialization {
26    Full,
27    Lazy,
28}
29
30impl PullMaterialization {
31    pub(crate) fn allows_partial_fetch(self) -> bool {
32        matches!(self, Self::Lazy)
33    }
34}
35
36impl HostedGrpcClient {
37    pub async fn hydrate_pulled_state(
38        &mut self,
39        repo: &Repository,
40        repo_path: &str,
41        remote_thread: &str,
42        target_state: ChangeId,
43    ) -> Result<usize, ProtocolError> {
44        self.hydrate_missing_blobs_for_state(repo, repo_path, remote_thread, target_state)
45            .await
46    }
47}
48
49/// Read-time blob hydrator for **hosted** lazy clones (issue #50).
50///
51/// Plugs into [`repo::Repository::set_blob_hydrator`]: when
52/// [`Repository::require_blob`] hits a missing-blob marker left behind by a
53/// lazy hosted clone (`heddle clone --lazy <hosted-url>` /
54/// `--filter blob:none`), the read path delegates here, this hydrator re-runs
55/// the pull with full materialization for the *current* tip of `local_thread`,
56/// and the read is retried against the freshly populated store.
57///
58/// ## Runtime bridge
59///
60/// `Repository::require_blob` is sync. The underlying gRPC stack is async,
61/// and the hydrator must be invocable from BOTH async contexts (the
62/// `#[tokio::main]` CLI command path) and plain non-Tokio threads (future
63/// FFI callers, test helpers). `Handle::block_on` invoked from within a
64/// running Tokio runtime panics ("Cannot start a runtime from within a
65/// runtime"), so we cannot bridge in-place.
66///
67/// Instead, on first use we spawn a dedicated worker thread that owns its
68/// own current-thread Tokio runtime + a connected `HostedGrpcClient`. Each
69/// `hydrate()` call sends a request over an mpsc channel and blocks on the
70/// reply. The worker `block_on`s the gRPC call inside its private runtime,
71/// avoiding any nesting. This pattern is robust regardless of what the
72/// caller's thread is doing.
73pub struct LazyHostedHydrator {
74    /// Endpoint spec as `host:port` (or an IP literal). Re-resolved via DNS
75    /// on first connect so a hostname behind a load balancer with rotating
76    /// IPs still works across process restarts. We deliberately do NOT
77    /// store a [`std::net::SocketAddr`] here — that would freeze the IP at
78    /// clone time and break later reconnects.
79    endpoint: String,
80    repo_path: String,
81    remote_thread: String,
82    /// Local thread to resolve to a state on each hydrate. Re-read every
83    /// call so a `pull --lazy` that advances the thread tip is honored
84    /// without rewriting `lazy-hydrator.toml`.
85    local_thread: String,
86    bridge: OnceLock<HydrationBridge>,
87    /// Held during first-use bridge construction so the connect + spawn
88    /// sequence is atomic — N concurrent first-time callers see exactly
89    /// one bridge built and shared, rather than N runtimes / N clients
90    /// racing via separate `OnceLock::set` calls (the round-2 bug).
91    init_lock: Mutex<()>,
92}
93
94impl LazyHostedHydrator {
95    pub fn new(
96        endpoint: impl Into<String>,
97        repo_path: impl Into<String>,
98        remote_thread: impl Into<String>,
99        local_thread: impl Into<String>,
100    ) -> Self {
101        Self {
102            endpoint: endpoint.into(),
103            repo_path: repo_path.into(),
104            remote_thread: remote_thread.into(),
105            local_thread: local_thread.into(),
106            bridge: OnceLock::new(),
107            init_lock: Mutex::new(()),
108        }
109    }
110
111    fn ensure_bridge(&self) -> objects::error::Result<&HydrationBridge> {
112        if let Some(bridge) = self.bridge.get() {
113            return Ok(bridge);
114        }
115        // Serialize first-time construction so the runtime, client, and
116        // worker thread are installed as one atomic unit.
117        let _guard = self.init_lock.lock().unwrap_or_else(|poison| {
118            // Prior initializer panicked. The bridge is either set (good)
119            // or absent (caller will retry). Either way clearing the
120            // poison and continuing is correct — we re-check `bridge.get`
121            // below.
122            poison.into_inner()
123        });
124        if let Some(bridge) = self.bridge.get() {
125            return Ok(bridge);
126        }
127
128        let bridge = HydrationBridge::connect(&self.endpoint)?;
129        // The init_lock guarantees no race: `set` must succeed here.
130        self.bridge.set(bridge).map_err(|_| {
131            HeddleError::Config(
132                "lazy hosted hydrator: bridge slot already filled under init_lock — \
133                     this indicates a logic bug in LazyHostedHydrator"
134                    .to_string(),
135            )
136        })?;
137        Ok(self.bridge.get().expect("just set under init_lock"))
138    }
139}
140
141impl BlobHydrator for LazyHostedHydrator {
142    fn hydrate(&self, repo: &Repository, _hash: &ContentHash) -> objects::error::Result<()> {
143        // `_hash` is ignored: `hydrate_pulled_state` refetches every
144        // missing blob reachable from `target_state`, not just one. This
145        // matches the hosted-side strategy that already exists
146        // (sync.rs:541) and is the cheapest correct behaviour given the
147        // partial-fetch metadata records the blake3 only.
148
149        // Re-resolve the target state from the repo on EVERY call. If a
150        // `pull --lazy` advanced the local thread between clone and now,
151        // the cached state would point at the OLD tip and we'd leave any
152        // post-pull missing blobs unresolved — that was the round-2 P1.
153        let target_state = match repo
154            .refs()
155            .get_thread(&ThreadName::from(self.local_thread.as_str()))
156        {
157            Ok(Some(id)) => id,
158            Ok(None) => {
159                return Err(HeddleError::Config(format!(
160                    "lazy hosted hydrator: local thread '{}' has no recorded tip — \
161                     was the lazy clone interrupted? Try `heddle pull --lazy` to refresh.",
162                    self.local_thread,
163                )));
164            }
165            Err(err) => {
166                return Err(HeddleError::Config(format!(
167                    "lazy hosted hydrator: failed to read local thread '{}': {err}",
168                    self.local_thread,
169                )));
170            }
171        };
172
173        let bridge = self.ensure_bridge()?;
174        bridge
175            .hydrate(repo, &self.repo_path, &self.remote_thread, target_state)
176            .map(|_count| ())
177            .map_err(|err| HeddleError::Io(std::io::Error::other(err.to_string())))
178    }
179}
180
181/// Background worker bridging sync `BlobHydrator::hydrate` calls to the
182/// async gRPC stack. Owns a dedicated current-thread Tokio runtime and a
183/// connected `HostedGrpcClient`. Callers reopen the repository root into
184/// an owned handle, dispatch hydrate requests over an mpsc channel, and
185/// block on a per-request reply channel.
186///
187/// This indirection is what makes the hydrator safe to call from a
188/// `#[tokio::main]` async context: the worker's runtime is private, so the
189/// nested `block_on` happens entirely off the caller's runtime.
190struct HydrationBridge {
191    tx: mpsc::Sender<HydrateMessage>,
192    /// Join handle for the worker. Kept so that dropping the bridge
193    /// closes the channel and lets the worker exit cleanly.
194    _worker: thread::JoinHandle<()>,
195}
196
197enum HydrateMessage {
198    Run {
199        repo: Arc<Repository>,
200        repo_path: String,
201        remote_thread: String,
202        target_state: ChangeId,
203        reply: mpsc::SyncSender<Result<usize, ProtocolError>>,
204    },
205}
206
207impl HydrationBridge {
208    fn connect(endpoint: &str) -> objects::error::Result<Self> {
209        // Resolve DNS at connect time so a hostname that's persisted
210        // (rather than a frozen IP) re-resolves on every process start.
211        let addr = endpoint
212            .to_socket_addrs()
213            .map_err(|err| {
214                HeddleError::Config(format!(
215                    "lazy hosted hydrator: resolve endpoint '{endpoint}': {err}",
216                ))
217            })?
218            .next()
219            .ok_or_else(|| {
220                HeddleError::Config(format!(
221                    "lazy hosted hydrator: DNS returned no addresses for '{endpoint}'",
222                ))
223            })?;
224
225        let user_config = cli_shared::UserConfig::load_default().map_err(|err| {
226            HeddleError::Config(format!("lazy hosted hydrator: load user config: {err}"))
227        })?;
228        // Build + validate the session config on this thread so a rejected
229        // TLS/auth config surfaces synchronously, before the worker thread is
230        // spawned. The worker connects + rotates through `session.connect`.
231        let session = HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)
232            .map_err(|err| {
233                HeddleError::Config(format!(
234                    "lazy hosted hydrator: load TLS/auth client config: {err}"
235                ))
236            })?;
237
238        // Build the worker thread first so the bridge can store the
239        // tx side immediately. The worker's runtime + client are
240        // constructed inside the worker (so the runtime's
241        // `Handle::current()` matches the thread that drives it).
242        let (tx, rx) = mpsc::channel::<HydrateMessage>();
243        let (ready_tx, ready_rx) = mpsc::sync_channel::<Result<(), HeddleError>>(0);
244        let endpoint_for_thread = endpoint.to_string();
245        let worker = thread::Builder::new()
246            .name("heddle-lazy-hydrator".into())
247            .spawn(move || {
248                // Build the runtime on this thread so all RPCs execute
249                // inside it. `current_thread` is sufficient: hydrate
250                // calls are serialized through the mpsc channel anyway,
251                // and avoiding extra worker threads keeps the resource
252                // footprint of an idle lazy clone minimal.
253                let runtime = match tokio::runtime::Builder::new_current_thread()
254                    .enable_all()
255                    .build()
256                {
257                    Ok(rt) => rt,
258                    Err(err) => {
259                        let _ = ready_tx.send(Err(HeddleError::Config(format!(
260                            "lazy hosted hydrator: build worker runtime: {err}",
261                        ))));
262                        return;
263                    }
264                };
265
266                let connect_result = runtime.block_on(async {
267                    // `session.connect` connects and runs mandatory rotation
268                    // together — the same seam every other hosted entry point
269                    // (clone, fetch, push, pull, support, approval) opens
270                    // through — so a process whose cached token has slipped
271                    // past expiry recovers on first lazy hydrate.
272                    let client = match tokio::time::timeout(
273                        DEFAULT_HOSTED_HYDRATION_TIMEOUT,
274                        session.connect(addr),
275                    )
276                    .await
277                    {
278                        Ok(result) => result.map_err(|err: ProtocolError| {
279                            HeddleError::Config(format!(
280                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
281                                     (resolved to {addr}): {err}",
282                            ))
283                        })?,
284                        Err(_) => {
285                            return Err(HeddleError::Config(format!(
286                                "lazy hosted hydrator: connect to '{endpoint_for_thread}' \
287                                     (resolved to {addr}) timed out after {}",
288                                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
289                            )));
290                        }
291                    };
292                    Ok::<_, HeddleError>(client)
293                });
294                let mut client = match connect_result {
295                    Ok(c) => c,
296                    Err(err) => {
297                        let _ = ready_tx.send(Err(err));
298                        return;
299                    }
300                };
301
302                // Signal the bridge constructor that connect succeeded
303                // BEFORE entering the request loop. After this point any
304                // bridge-construction errors are gone; the channel is open
305                // and `HydrationBridge::hydrate` calls will succeed.
306                if ready_tx.send(Ok(())).is_err() {
307                    return;
308                }
309
310                // Drive the request loop. `recv` returns Err when the
311                // last `Sender` is dropped (i.e. the LazyHostedHydrator
312                // owning the bridge has been dropped), which is our
313                // shutdown signal — we drop the runtime + client and
314                // exit.
315                runtime.block_on(async {
316                    while let Ok(message) = rx.recv() {
317                        match message {
318                            HydrateMessage::Run {
319                                repo,
320                                repo_path,
321                                remote_thread,
322                                target_state,
323                                reply,
324                            } => {
325                                let result = hydrate_with_rpc_timeout(
326                                    &mut client,
327                                    repo.as_ref(),
328                                    &repo_path,
329                                    &remote_thread,
330                                    target_state,
331                                    DEFAULT_HOSTED_HYDRATION_TIMEOUT,
332                                )
333                                .await;
334                                let _ = reply.send(result);
335                            }
336                        }
337                    }
338                });
339            })
340            .map_err(|err| {
341                HeddleError::Config(format!("lazy hosted hydrator: spawn worker thread: {err}",))
342            })?;
343
344        // Wait for the worker to either confirm connect or report an
345        // error. The wait is bounded so a stalled first-use connect cannot
346        // wedge the sync read path.
347        match ready_rx.recv_timeout(DEFAULT_HOSTED_HYDRATION_TIMEOUT) {
348            Ok(Ok(())) => Ok(Self {
349                tx,
350                _worker: worker,
351            }),
352            Ok(Err(err)) => Err(err),
353            Err(mpsc::RecvTimeoutError::Timeout) => Err(HeddleError::Config(format!(
354                "lazy hosted hydrator: worker did not signal readiness within {}",
355                format_duration(DEFAULT_HOSTED_HYDRATION_TIMEOUT)
356            ))),
357            Err(mpsc::RecvTimeoutError::Disconnected) => Err(HeddleError::Config(
358                "lazy hosted hydrator: worker thread exited before signalling readiness"
359                    .to_string(),
360            )),
361        }
362    }
363
364    fn hydrate(
365        &self,
366        repo: &Repository,
367        repo_path: &str,
368        remote_thread: &str,
369        target_state: ChangeId,
370    ) -> Result<usize, ProtocolError> {
371        self.hydrate_with_timeout(
372            repo,
373            repo_path,
374            remote_thread,
375            target_state,
376            DEFAULT_HOSTED_HYDRATION_TIMEOUT,
377        )
378    }
379
380    fn hydrate_with_timeout(
381        &self,
382        repo: &Repository,
383        repo_path: &str,
384        remote_thread: &str,
385        target_state: ChangeId,
386        timeout: Duration,
387    ) -> Result<usize, ProtocolError> {
388        let repo = Arc::new(Repository::open(repo.root()).map_err(ProtocolError::from)?);
389
390        // Bounded reply channel of capacity 1; each sync caller blocks until
391        // the worker returns the gRPC result for this request.
392        let (reply_tx, reply_rx) = mpsc::sync_channel::<Result<usize, ProtocolError>>(1);
393        self.tx
394            .send(HydrateMessage::Run {
395                repo,
396                repo_path: repo_path.to_string(),
397                remote_thread: remote_thread.to_string(),
398                target_state,
399                reply: reply_tx,
400            })
401            .map_err(|err| {
402                ProtocolError::Io(std::io::Error::other(format!(
403                    "lazy hosted hydrator: worker channel closed: {err}",
404                )))
405            })?;
406        match reply_rx.recv_timeout(timeout) {
407            Ok(result) => result,
408            Err(mpsc::RecvTimeoutError::Timeout) => Err(hydration_timeout_error(
409                timeout,
410                repo_path,
411                remote_thread,
412                target_state,
413            )),
414            Err(mpsc::RecvTimeoutError::Disconnected) => {
415                Err(ProtocolError::Io(std::io::Error::other(
416                    "lazy hosted hydrator: worker reply channel closed before hydration completed",
417                )))
418            }
419        }
420    }
421}
422
423async fn hydrate_with_rpc_timeout(
424    client: &mut HostedGrpcClient,
425    repo: &Repository,
426    repo_path: &str,
427    remote_thread: &str,
428    target_state: ChangeId,
429    timeout: Duration,
430) -> Result<usize, ProtocolError> {
431    match tokio::time::timeout(
432        timeout,
433        client.hydrate_pulled_state(repo, repo_path, remote_thread, target_state),
434    )
435    .await
436    {
437        Ok(result) => result,
438        Err(_) => Err(hydration_timeout_error(
439            timeout,
440            repo_path,
441            remote_thread,
442            target_state,
443        )),
444    }
445}
446
447fn hydration_timeout_error(
448    timeout: Duration,
449    repo_path: &str,
450    remote_thread: &str,
451    target_state: ChangeId,
452) -> ProtocolError {
453    ProtocolError::Io(std::io::Error::new(
454        std::io::ErrorKind::TimedOut,
455        format!(
456            "lazy hosted hydrator: blob hydration timed out after {} \
457             (repo={repo_path}, remote_thread={remote_thread}, target_state={target_state})",
458            format_duration(timeout)
459        ),
460    ))
461}
462
463fn format_duration(duration: Duration) -> String {
464    if duration.subsec_nanos() == 0 {
465        format!("{}s", duration.as_secs())
466    } else {
467        format!("{duration:?}")
468    }
469}
470
471/// Register the `"hosted"` factory in the global lazy-hydrator registry.
472/// Call once at process startup. The factory reads the hosted-section
473/// fields out of `lazy-hydrator.toml` and hands back a
474/// [`LazyHostedHydrator`] adapter that defers the actual gRPC connect (and
475/// worker-thread spawn) until the first `require_blob` call needs it.
476pub fn register_hosted_factory() {
477    use std::{path::Path as StdPath, sync::Arc as StdArc};
478
479    use repo::lazy_hydrator::{
480        BlobHydratorFactory, HydratorSection, KIND_HOSTED, register_factory,
481    };
482
483    let factory: BlobHydratorFactory = StdArc::new(
484        |_root: &StdPath,
485         section: &HydratorSection|
486         -> objects::error::Result<StdArc<dyn BlobHydrator>> {
487            let hosted = section.hosted.as_ref().ok_or_else(|| {
488                HeddleError::Config(
489                    "lazy hosted hydrator: lazy-hydrator.toml has kind=\"hosted\" \
490                     but no [hydrator.hosted] table was found"
491                        .to_string(),
492                )
493            })?;
494            Ok(StdArc::new(LazyHostedHydrator::new(
495                hosted.endpoint.clone(),
496                hosted.repo_path.clone(),
497                hosted.remote_thread.clone(),
498                hosted.local_thread.clone(),
499            )))
500        },
501    );
502    register_factory(KIND_HOSTED, factory);
503}
504
505#[cfg(test)]
506mod tests {
507    //! These tests exercise the lazy-hydrator adapter against a worker
508    //! bridge that connects to a definitely-closed `127.0.0.1:1` endpoint
509    //! via `Endpoint::connect_lazy` — the channel doesn't actually dial
510    //! until the first RPC, at which point it fails predictably with a
511    //! transport-layer error. That's enough to drive the bridge's
512    //! sync→async hand-off, runtime construction, and error propagation
513    //! end-to-end without spinning up an in-process gRPC server.
514    use std::{
515        sync::{
516            Arc,
517            atomic::{AtomicUsize, Ordering},
518            mpsc,
519        },
520        thread,
521        time::{Duration, Instant},
522    };
523
524    use cli_shared::ClientConfig;
525    use grpc::heddle::v1::{
526        auth_service_client::AuthServiceClient, content_service_client::ContentServiceClient,
527        hosted_user_service_client::HostedUserServiceClient,
528        repo_sync_service_client::RepoSyncServiceClient,
529        tree_edit_service_client::TreeEditServiceClient,
530    };
531    use objects::object::{Blob, ChangeId, ThreadName};
532    use repo::Repository;
533    use tempfile::TempDir;
534    use tonic::transport::Endpoint;
535
536    use super::{
537        super::{HostedGrpcClient, helpers::HostedTransportPolicy},
538        BlobHydrator, HydrationBridge, LazyHostedHydrator,
539    };
540
541    /// Build a `HostedGrpcClient` that points at a closed loopback port
542    /// via `connect_lazy`. RPCs fail with a transport error rather than
543    /// hanging. Must be called from inside a tokio runtime context.
544    fn fabricate_offline_client() -> HostedGrpcClient {
545        let endpoint = Endpoint::from_static("http://127.0.0.1:1");
546        let channel = endpoint.connect_lazy();
547        let config = ClientConfig::default();
548        let transport = HostedTransportPolicy::from_client_config(&config);
549        HostedGrpcClient {
550            inner: RepoSyncServiceClient::new(channel.clone()),
551            user: HostedUserServiceClient::new(channel.clone()),
552            auth: AuthServiceClient::new(channel.clone()),
553            content: ContentServiceClient::new(channel.clone()),
554            tree_edit: TreeEditServiceClient::new(channel),
555            token_header: None,
556            transport,
557            auth_proof_key_pem: None,
558            server_key: None,
559            on_human_signature: None,
560        }
561    }
562
563    /// Build the smallest Heddle repo + seed the `main` thread to a real
564    /// state so `hydrate` can resolve `local_thread`.
565    fn temp_repo() -> (TempDir, Repository) {
566        let temp = TempDir::new().expect("temp");
567        let repo = Repository::init_default(temp.path()).expect("init heddle repo");
568        (temp, repo)
569    }
570
571    /// Spawn a `HydrationBridge` with a pre-built offline client, bypassing
572    /// the DNS / connect / credential paths so tests stay hermetic.
573    fn offline_bridge() -> HydrationBridge {
574        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
575        let worker = thread::Builder::new()
576            .name("test-lazy-hydrator".into())
577            .spawn(move || {
578                let runtime = tokio::runtime::Builder::new_current_thread()
579                    .enable_all()
580                    .build()
581                    .expect("worker runtime");
582                let mut client = runtime.block_on(async { fabricate_offline_client() });
583                runtime.block_on(async {
584                    while let Ok(message) = rx.recv() {
585                        match message {
586                            super::HydrateMessage::Run {
587                                repo,
588                                repo_path,
589                                remote_thread,
590                                target_state,
591                                reply,
592                            } => {
593                                let result = client
594                                    .hydrate_pulled_state(
595                                        repo.as_ref(),
596                                        &repo_path,
597                                        &remote_thread,
598                                        target_state,
599                                    )
600                                    .await;
601                                let _ = reply.send(result);
602                            }
603                        }
604                    }
605                });
606            })
607            .expect("spawn test worker");
608        HydrationBridge {
609            tx,
610            _worker: worker,
611        }
612    }
613
614    /// Construct a `LazyHostedHydrator` whose bridge is already installed
615    /// from `offline_bridge`. Bypasses the real `ensure_bridge` connect
616    /// path so we can drive the trait surface deterministically.
617    fn offline_lazy_hydrator(local_thread: &str) -> LazyHostedHydrator {
618        let hydrator = LazyHostedHydrator::new(
619            "ignored.example.test:443",
620            "org/acme/repo",
621            "main",
622            local_thread,
623        );
624        hydrator
625            .bridge
626            .set(offline_bridge())
627            .map_err(|_| ())
628            .expect("set bridge");
629        hydrator
630    }
631
632    /// Round-3 test from the task brief — proves the worker bridge is
633    /// callable from inside a `#[tokio::main]`-style multi-thread async
634    /// context. With the previous design (`Handle::block_on` from the
635    /// outer runtime's thread) this would have panicked.
636    #[test]
637    fn hydrate_safe_from_tokio_main_context() {
638        let runtime = tokio::runtime::Builder::new_multi_thread()
639            .worker_threads(2)
640            .enable_all()
641            .build()
642            .expect("multi-thread runtime");
643        runtime.block_on(async {
644            let (_temp, repo) = temp_repo();
645            let target = repo
646                .refs()
647                .get_thread(&ThreadName::from("main"))
648                .unwrap()
649                .unwrap();
650            // Seed a known thread tip the hydrator can resolve via
651            // `local_thread`.
652            let _ = target;
653
654            let hydrator = offline_lazy_hydrator("main");
655            let blake3 = Blob::new(b"placeholder".to_vec()).hash();
656            // Must not panic. The offline client surfaces a transport
657            // error, which the trait reshapes into a HeddleError::Io. We
658            // assert "non-empty error" rather than pinning tonic wording.
659            let err = hydrator
660                .hydrate(&repo, &blake3)
661                .expect_err("offline endpoint must produce an error");
662            assert!(!err.to_string().is_empty(), "must surface a real error");
663        });
664    }
665
666    /// Round-3 test from the task brief — direct counterpart to the
667    /// Tokio test above. The hydrator must also work on plain non-Tokio
668    /// threads (the future FFI / library-embedder path).
669    #[test]
670    fn hydrate_safe_from_blocking_context() {
671        let (_temp, repo) = temp_repo();
672        let hydrator = offline_lazy_hydrator("main");
673        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
674        let err = hydrator
675            .hydrate(&repo, &blake3)
676            .expect_err("offline endpoint must produce an error");
677        assert!(!err.to_string().is_empty(), "must surface a real error");
678    }
679
680    /// Round-3 test from the task brief. If `target_state` were cached at
681    /// first hydrate (the round-2 bug), the second call against an advanced
682    /// thread tip would hydrate against the OLD state. We exercise both
683    /// the first and second hydrate, and inspect the request the bridge
684    /// processed via an inspection bridge that captures the target_state
685    /// it received.
686    #[test]
687    fn hydrate_after_thread_advance_uses_new_state() {
688        // Build an inspecting bridge: instead of running real RPCs it
689        // records the ChangeId on each request and replies with an
690        // "io error: simulated". That lets us verify the bridge saw the
691        // post-advance ChangeId on the second call.
692        let recorded: Arc<std::sync::Mutex<Vec<ChangeId>>> =
693            Arc::new(std::sync::Mutex::new(Vec::new()));
694        let recorded_for_worker = Arc::clone(&recorded);
695        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
696        let worker = thread::Builder::new()
697            .name("inspect-hydrator".into())
698            .spawn(move || {
699                while let Ok(message) = rx.recv() {
700                    match message {
701                        super::HydrateMessage::Run {
702                            target_state,
703                            reply,
704                            ..
705                        } => {
706                            recorded_for_worker.lock().unwrap().push(target_state);
707                            let _ = reply.send(Err(wire::ProtocolError::Io(
708                                std::io::Error::other("simulated"),
709                            )));
710                        }
711                    }
712                }
713            })
714            .expect("spawn inspect worker");
715        let bridge = HydrationBridge {
716            tx,
717            _worker: worker,
718        };
719
720        let hydrator =
721            LazyHostedHydrator::new("ignored.example.test:443", "org/acme/repo", "main", "main");
722        hydrator.bridge.set(bridge).map_err(|_| ()).expect("set");
723
724        let (_temp, repo) = temp_repo();
725        let first_tip = repo
726            .refs()
727            .get_thread(&ThreadName::from("main"))
728            .unwrap()
729            .unwrap();
730
731        // First hydrate — bridge sees the original tip.
732        let blake3 = Blob::new(b"a".to_vec()).hash();
733        let _ = hydrator.hydrate(&repo, &blake3);
734
735        // Advance the local "main" thread to a fresh, distinct ChangeId.
736        let advanced = ChangeId::generate();
737        assert_ne!(advanced, first_tip, "fresh ChangeId must differ");
738        repo.refs()
739            .set_thread(&ThreadName::from("main"), &advanced)
740            .expect("advance");
741
742        // Second hydrate — bridge MUST see the advanced tip, not the
743        // first one (round-2 cached-state bug regression guard).
744        let _ = hydrator.hydrate(&repo, &blake3);
745
746        let seen = recorded.lock().unwrap().clone();
747        assert_eq!(seen.len(), 2, "two hydrate calls = two recorded states");
748        assert_eq!(seen[0], first_tip, "first call uses original tip");
749        assert_eq!(
750            seen[1], advanced,
751            "second call MUST re-resolve to the advanced tip"
752        );
753    }
754
755    /// Round-3 test from the task brief. With the round-2 design,
756    /// concurrent first-time callers raced two separate `OnceLock::set`
757    /// calls (runtime + inner) and could end up storing an inner whose
758    /// `Handle` referenced a runtime that was dropped by the losing
759    /// thread. Now there's a single OnceLock + an init_lock, so all
760    /// callers observe exactly one bridge.
761    #[test]
762    fn concurrent_first_use_no_race() {
763        const N: usize = 8;
764        let (_temp, repo) = temp_repo();
765        let repo = Arc::new(repo);
766        // The arc allows N threads to share one hydrator that they all
767        // race to initialize.
768        let hydrator = Arc::new(offline_lazy_hydrator("main"));
769        let observed_ok: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
770        let observed_err: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
771
772        let mut handles = Vec::with_capacity(N);
773        for _ in 0..N {
774            let repo = Arc::clone(&repo);
775            let hydrator = Arc::clone(&hydrator);
776            let observed_ok = Arc::clone(&observed_ok);
777            let observed_err = Arc::clone(&observed_err);
778            handles.push(thread::spawn(move || {
779                let blake3 = Blob::new(b"placeholder".to_vec()).hash();
780                match hydrator.hydrate(repo.as_ref(), &blake3) {
781                    Ok(()) => observed_ok.fetch_add(1, Ordering::SeqCst),
782                    Err(_) => observed_err.fetch_add(1, Ordering::SeqCst),
783                };
784            }));
785        }
786        for h in handles {
787            h.join().expect("worker joined");
788        }
789        // Either outcome is fine — the assertion is that no panic /
790        // deadlock occurred and every caller got a reply. The offline
791        // client produces errors, so we expect all N to land in the err
792        // bucket; we accept any split as long as the total is N.
793        let total = observed_ok.load(Ordering::SeqCst) + observed_err.load(Ordering::SeqCst);
794        assert_eq!(total, N, "every concurrent caller must receive a reply");
795    }
796
797    #[test]
798    fn hydrate_times_out_when_worker_never_replies() {
799        let (_temp, repo) = temp_repo();
800        let target = repo
801            .refs()
802            .get_thread(&ThreadName::from("main"))
803            .unwrap()
804            .unwrap();
805        let (tx, rx) = mpsc::channel::<super::HydrateMessage>();
806        let (release_tx, release_rx) = mpsc::sync_channel::<()>(0);
807        let (done_tx, done_rx) = mpsc::sync_channel::<()>(0);
808        let worker = thread::Builder::new()
809            .name("stalling-hydrator".into())
810            .spawn(move || {
811                match rx.recv() {
812                    Ok(super::HydrateMessage::Run { reply, .. }) => {
813                        let _ = release_rx.recv();
814                        drop(reply);
815                    }
816                    Err(_) => {}
817                }
818                let _ = done_tx.send(());
819            })
820            .expect("spawn stalling worker");
821        let bridge = HydrationBridge {
822            tx,
823            _worker: worker,
824        };
825
826        let started = Instant::now();
827        let err = bridge
828            .hydrate_with_timeout(
829                &repo,
830                "org/acme/repo",
831                "main",
832                target,
833                Duration::from_millis(50),
834            )
835            .expect_err("stalled worker must time out");
836        let elapsed = started.elapsed();
837
838        assert!(
839            elapsed < Duration::from_secs(1),
840            "hydrate timeout must return promptly; elapsed {elapsed:?}"
841        );
842        let msg = err.to_string();
843        assert!(
844            msg.contains("blob hydration timed out after") && msg.contains("org/acme/repo"),
845            "timeout error must name the operation and repo context; got: {msg}"
846        );
847
848        release_tx.send(()).expect("release stalled worker");
849        done_rx
850            .recv_timeout(Duration::from_secs(1))
851            .expect("worker exits after release");
852    }
853
854    /// Drop the bridge → worker exits cleanly. Catches the case where a
855    /// future refactor leaks the worker forever.
856    #[test]
857    fn dropping_bridge_shuts_worker_down() {
858        let bridge = offline_bridge();
859        // Pull the worker handle out via a Drop-detecting wrapper isn't
860        // possible without restructuring; instead we observe that
861        // dropping the bridge closes the channel and `send` afterwards
862        // would fail. The cleanest visible assertion: dropping the
863        // bridge does not hang the test.
864        drop(bridge);
865        // Give the worker a moment to wind down on slow CI.
866        thread::sleep(Duration::from_millis(50));
867    }
868
869    /// Force the owned repo handle into the type system. The hydration
870    /// worker must receive an owned `Arc<Repository>` rather than a raw
871    /// borrowed pointer whose lifetime is erased across the mpsc channel.
872    #[test]
873    fn hydration_message_carries_send_owned_repo_handle() {
874        fn assert_send_static<T: Send + 'static>(_: &T) {}
875        let (_temp, repo) = temp_repo();
876        let (reply, _recv) = mpsc::sync_channel::<Result<usize, wire::ProtocolError>>(1);
877        let message = super::HydrateMessage::Run {
878            repo: Arc::new(repo),
879            repo_path: "org/acme/repo".to_string(),
880            remote_thread: "main".to_string(),
881            target_state: ChangeId::generate(),
882            reply,
883        };
884        assert_send_static(&message);
885    }
886
887    #[test]
888    fn hydration_bridge_does_not_reintroduce_raw_repo_pointer() {
889        let source = include_str!("hydration.rs");
890        let raw_wrapper = ["Repo", "Ptr"].concat();
891        let raw_repo_pointer = ["*const ", "Repository"].concat();
892        assert!(
893            !source.contains(&raw_wrapper),
894            "hydration bridge must not reintroduce the raw-pointer send wrapper"
895        );
896        assert!(
897            !source.contains(&raw_repo_pointer),
898            "hydration bridge must not send raw Repository pointers across threads"
899        );
900    }
901
902    /// Round-4 patch-coverage fill: exercise the `hydrate` early-return
903    /// taken when the persisted `local_thread` has no recorded tip in the
904    /// current repo (e.g. the lazy clone was interrupted before the first
905    /// thread write landed). The hydrator must surface this as a clean
906    /// `Config` error rather than calling `ensure_bridge` and dialing the
907    /// network for a state we don't have.
908    #[test]
909    fn hydrate_returns_config_error_when_local_thread_missing() {
910        let (_temp, repo) = temp_repo();
911        // Pre-set the bridge so `ensure_bridge` would succeed if reached —
912        // that way a failure here proves the early-return fired before the
913        // bridge was consulted.
914        let hydrator = offline_lazy_hydrator("thread-that-was-never-written");
915        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
916        let err = hydrator
917            .hydrate(&repo, &blake3)
918            .expect_err("missing thread must surface as Config error");
919        let msg = err.to_string();
920        assert!(
921            msg.contains("no recorded tip") && msg.contains("thread-that-was-never-written"),
922            "error must name the missing thread and explain why hydration was skipped; got: {msg}"
923        );
924    }
925
926    /// Round-4 patch-coverage fill: drive the real `ensure_bridge` path
927    /// (no pre-installed bridge) against an unresolvable hostname. The
928    /// DNS error must propagate back through `HydrationBridge::connect`
929    /// → `ensure_bridge` → `hydrate` rather than panicking or hanging.
930    ///
931    /// The `.invalid` TLD is RFC 2606-reserved and guaranteed never to
932    /// resolve, so this test stays hermetic in CI environments without
933    /// outbound DNS.
934    #[test]
935    fn ensure_bridge_propagates_dns_failure() {
936        let (_temp, repo) = temp_repo();
937        // Note: no `offline_lazy_hydrator` — this constructor leaves
938        // `bridge` empty so the first `hydrate()` exercises the real
939        // ensure_bridge → HydrationBridge::connect path including DNS.
940        let hydrator = LazyHostedHydrator::new(
941            "definitely-nonexistent-host-for-tests.invalid:443",
942            "org/acme/repo",
943            "main",
944            "main",
945        );
946        let blake3 = Blob::new(b"placeholder".to_vec()).hash();
947        let err = hydrator
948            .hydrate(&repo, &blake3)
949            .expect_err("unresolvable endpoint must surface as a Config error");
950        let msg = err.to_string();
951        assert!(
952            msg.contains("resolve endpoint")
953                || msg.contains("DNS returned no addresses")
954                || msg.contains(".invalid"),
955            "error must identify the DNS-resolution failure; got: {msg}"
956        );
957        // Repeat the call — second attempt must also fail-fast (no
958        // half-initialized bridge cached on disk / in OnceLock).
959        let err2 = hydrator
960            .hydrate(&repo, &blake3)
961            .expect_err("second call must also fail rather than reuse a partial bridge");
962        assert!(
963            !err2.to_string().is_empty(),
964            "second call must surface a real error"
965        );
966    }
967}
968
969#[cfg(test)]
970mod register_factory_tests {
971    //! Round-4 patch-coverage fill for `register_hosted_factory` and the
972    //! closure it installs in the lazy-hydrator registry. Both branches
973    //! of the closure (missing `[hydrator.hosted]` table → Config error;
974    //! present table → ready-to-install adapter) are exercised here.
975
976    use std::sync::Mutex;
977
978    use repo::lazy_hydrator::{HostedHydratorConfig, HydratorSection, KIND_HOSTED, lookup_factory};
979    use tempfile::TempDir;
980
981    use super::register_hosted_factory;
982
983    /// Serialize tests that mutate the process-wide hydrator registry so
984    /// they don't race on the global `"hosted"` key.
985    static REGISTRY_LOCK: Mutex<()> = Mutex::new(());
986
987    #[test]
988    fn register_hosted_factory_installs_factory_for_kind_hosted() {
989        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
990        register_hosted_factory();
991        assert!(
992            lookup_factory(KIND_HOSTED).is_some(),
993            "register_hosted_factory must populate the registry under KIND_HOSTED"
994        );
995    }
996
997    #[test]
998    fn registered_factory_builds_adapter_for_hosted_section() {
999        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1000        register_hosted_factory();
1001        let factory =
1002            lookup_factory(KIND_HOSTED).expect("factory present after register_hosted_factory");
1003        let temp = TempDir::new().expect("temp");
1004        let section = HydratorSection {
1005            kind: KIND_HOSTED.to_string(),
1006            hosted: Some(HostedHydratorConfig {
1007                endpoint: "example.heddle.cloud:443".to_string(),
1008                repo_path: "org/acme/repo".to_string(),
1009                remote_thread: "main".to_string(),
1010                local_thread: "main".to_string(),
1011            }),
1012            git_overlay: None,
1013        };
1014        let _hydrator = factory(temp.path(), &section)
1015            .expect("factory must produce an adapter when [hydrator.hosted] is present");
1016    }
1017
1018    #[test]
1019    fn registered_factory_errors_when_hosted_section_absent() {
1020        let _guard = REGISTRY_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1021        register_hosted_factory();
1022        let factory = lookup_factory(KIND_HOSTED).expect("factory present");
1023        let temp = TempDir::new().expect("temp");
1024        let section = HydratorSection {
1025            kind: KIND_HOSTED.to_string(),
1026            hosted: None,
1027            git_overlay: None,
1028        };
1029        let err = match factory(temp.path(), &section) {
1030            Ok(_) => panic!(
1031                "factory must reject a kind=hosted section that omits the [hydrator.hosted] table"
1032            ),
1033            Err(e) => e,
1034        };
1035        let msg = err.to_string();
1036        assert!(
1037            msg.contains("[hydrator.hosted]") || msg.contains("hydrator.hosted"),
1038            "error must name the missing TOML table; got: {msg}"
1039        );
1040    }
1041}
1042
1043#[cfg(test)]
1044mod connect_path_tests {
1045    //! Source-presence test for the credential-rotation invariant. Lazy
1046    //! hydration must open its session through the shared `HostedSession`
1047    //! seam — whose `connect` connects and rotates together (guarded by a
1048    //! source-presence test in `session.rs`) — rather than connecting by
1049    //! hand and risking a dropped rotation. Without rotation, a process
1050    //! whose cached token has slipped past expiry hits an auth failure on
1051    //! first lazy hydrate even though the rotation data is on disk.
1052    #[test]
1053    fn lazy_hosted_connect_opens_session_through_rotating_seam() {
1054        let source = include_str!("hydration.rs");
1055        assert!(
1056            source
1057                .contains("HostedSession::build(&user_config, None, HostedAuthMode::ConfigToken)"),
1058            "hydration.rs must build its session through the shared HostedSession seam",
1059        );
1060        assert!(
1061            source.contains("session.connect(addr)"),
1062            "hydration.rs must connect via HostedSession::connect, which owns rotation",
1063        );
1064    }
1065}
1066
1067#[cfg(test)]
1068mod config_persistence_tests {
1069    //! Tests for the round-3 hostname-vs-IP persistence fix. These live
1070    //! alongside the hydrator tests because the contract — "endpoint
1071    //! field stores a host:port string, NOT a resolved SocketAddr" — is
1072    //! enforced at the LazyHostedHydrator boundary.
1073    use repo::lazy_hydrator::LazyHydratorConfig;
1074    use tempfile::TempDir;
1075
1076    #[test]
1077    fn lazy_hydrator_config_round_trip_preserves_hostname() {
1078        let temp = TempDir::new().expect("temp");
1079        let heddle = temp.path().join(".heddle");
1080        // The persisted endpoint MUST be the hostname spec, not a
1081        // SocketAddr-formatted IP. clone.rs is the producer; here we
1082        // simulate it and verify load round-trips byte-for-byte.
1083        let endpoint = "example.heddle.cloud:443";
1084        let cfg = LazyHydratorConfig::hosted(endpoint, "org/acme/repo", "main", "main");
1085        cfg.save(&heddle).expect("save");
1086        let loaded = LazyHydratorConfig::load(&heddle)
1087            .expect("load")
1088            .expect("present");
1089        let hosted = loaded
1090            .hydrator
1091            .hosted
1092            .expect("hosted section present after round-trip");
1093        assert_eq!(
1094            hosted.endpoint, endpoint,
1095            "endpoint MUST round-trip as the original hostname:port spec; \
1096             pinning the IP at clone time would break hosts with rotating IPs"
1097        );
1098        // Sanity: the persisted value must not parse as a SocketAddr —
1099        // if it does, the producer was silently resolving DNS at save
1100        // time and we'd be back to the round-2 bug shape.
1101        assert!(
1102            hosted.endpoint.parse::<std::net::SocketAddr>().is_err(),
1103            "persisted endpoint must be a hostname spec, not a SocketAddr literal"
1104        );
1105    }
1106}