Skip to main content

heddle_client/grpc_hosted/
mod.rs

1//! Hosted gRPC client for the transport rewrite.
2
3mod content;
4mod helpers;
5mod hydration;
6pub mod request_signing;
7mod session;
8mod sync;
9mod tree_edit;
10mod user;
11
12use cli_shared::ClientConfig;
13use crypto::{Ed25519Signer, Signer};
14use grpc::heddle::v1::{
15    KeypairProof, MintBiscuitRequest, auth_service_client::AuthServiceClient,
16    content_service_client::ContentServiceClient,
17    hosted_user_service_client::HostedUserServiceClient, mint_biscuit_request::Proof,
18    repo_sync_service_client::RepoSyncServiceClient,
19    tree_edit_service_client::TreeEditServiceClient,
20};
21use objects::{object::MarkerName, store::ObjectStore};
22use repo::Repository;
23use tonic::{
24    Request,
25    metadata::MetadataValue,
26    transport::{Certificate, Channel, ClientTlsConfig, Endpoint},
27};
28use wire::ProtocolError;
29
30use crate::credentials;
31
32pub struct HostedGrpcClient {
33    pub(super) inner: RepoSyncServiceClient<Channel>,
34    pub(super) user: HostedUserServiceClient<Channel>,
35    pub(super) auth: AuthServiceClient<Channel>,
36    pub(super) content: ContentServiceClient<Channel>,
37    pub(super) tree_edit: TreeEditServiceClient<Channel>,
38    pub(super) token_header: Option<MetadataValue<tonic::metadata::Ascii>>,
39    transport: helpers::HostedTransportPolicy,
40    pub(super) auth_proof_key_pem: Option<String>,
41    /// The key used to look up this server's credential in the credential
42    /// store.  When set, `auto_rotate_if_needed` will use it to read and
43    /// update `~/.heddle/credentials.toml` transparently.
44    server_key: Option<String>,
45    /// App-registered WebAuthn signer invoked when a `human`-tier RPC is
46    /// rejected with `x-weft-sig-required: human`. `None` => human-tier RPCs
47    /// surface a typed error rather than looping. See
48    /// [`request_signing::HumanSignatureCallback`].
49    on_human_signature: Option<request_signing::HumanSignatureCallback>,
50}
51
52impl HostedGrpcClient {
53    pub async fn connect(
54        addr: std::net::SocketAddr,
55        config: &ClientConfig,
56    ) -> Result<Self, ProtocolError> {
57        let scheme = if config.tls_enabled { "https" } else { "http" };
58        let mut endpoint = Endpoint::from_shared(format!("{scheme}://{addr}"))
59            .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
60        if config.tls_enabled {
61            let mut tls = ClientTlsConfig::new();
62            if let Some(domain_name) = &config.tls_domain_name {
63                tls = tls.domain_name(domain_name.clone());
64            }
65            if let Some(ca_pem) = &config.tls_ca_certificate_pem {
66                tls = tls.ca_certificate(Certificate::from_pem(ca_pem.as_bytes()));
67            }
68            endpoint = endpoint
69                .tls_config(tls)
70                .map_err(|err| ProtocolError::InvalidState(err.to_string()))?;
71        }
72        let channel = endpoint
73            .connect()
74            .await
75            .map_err(|err| ProtocolError::Io(std::io::Error::other(err.to_string())))?;
76        let token_header = config
77            .token
78            .as_ref()
79            .map(|token| MetadataValue::try_from(format!("Bearer {}", token.id)))
80            .transpose()
81            .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
82        let transport = helpers::HostedTransportPolicy::from_client_config(config);
83        Ok(Self {
84            // Bound the single-shot, server-controlled sidecar allocation at
85            // the gRPC decode boundary: tonic rejects an oversized inbound
86            // `PullMessage` before its `redactions_blob`/`state_visibility_blob`
87            // `Vec<u8>` is ever materialized. The post-decode
88            // `check_received_transfer_blob_size` calls are kept as cheap
89            // defense-in-depth, but this is the load-bearing guard.
90            inner: RepoSyncServiceClient::new(channel.clone())
91                .max_decoding_message_size(wire::MAX_PULL_DECODE_MESSAGE_SIZE),
92            user: HostedUserServiceClient::new(channel.clone()),
93            auth: AuthServiceClient::new(channel.clone()),
94            content: ContentServiceClient::new(channel.clone()),
95            tree_edit: TreeEditServiceClient::new(channel),
96            token_header,
97            transport,
98            auth_proof_key_pem: config.auth_proof_key_pem.clone(),
99            server_key: config.server_key.clone(),
100            on_human_signature: None,
101        })
102    }
103
104    /// Register the app's WebAuthn signer for the destructive (`human`) tier.
105    ///
106    /// Invoked when a signed RPC is rejected with `x-weft-sig-required: human`;
107    /// the callback produces a [`request_signing::WebAuthnAssertion`] over the
108    /// same action and the call is retried once. With no callback registered, a
109    /// human-tier rejection surfaces a typed error (no loop). The CLI wires a
110    /// terminal-prompt implementation; tapestry a browser ceremony.
111    pub fn with_human_signature_callback(
112        mut self,
113        callback: request_signing::HumanSignatureCallback,
114    ) -> Self {
115        self.on_human_signature = Some(callback);
116        self
117    }
118
119    /// The device signer for request PoP, derived from the same
120    /// `auth_proof_key_pem` the client uses for the `x-heddle-proof` bearer
121    /// proof-of-possession. `None` when the client is anonymous / unauthed —
122    /// signing is then skipped (the server defaults to OBSERVE mode and ignores
123    /// missing signatures on unsigned-tier RPCs).
124    fn device_signer(&self) -> Result<Option<Ed25519Signer>, ProtocolError> {
125        match &self.auth_proof_key_pem {
126            Some(pem) => Ed25519Signer::from_pem(pem)
127                .map(Some)
128                .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string())),
129            None => Ok(None),
130        }
131    }
132
133    /// Stamp bearer auth (token + `x-heddle-proof`) and, for unary requests,
134    /// attach the Tier-1 PoP request signature over the serialized body.
135    ///
136    /// This is the single chokepoint every UNARY authenticated call routes
137    /// through. Streaming call sites (which have no single body to hash) call
138    /// [`Self::apply_auth`] directly instead. Returns the signing context so a
139    /// human-tier retry can re-derive the identical WebAuthn challenge; `None`
140    /// when signing was skipped (anonymous client).
141    pub(in crate::grpc_hosted) fn apply_signed_auth<T: prost::Message>(
142        &self,
143        request: &mut Request<T>,
144        method_path: &str,
145    ) -> Result<Option<request_signing::SignedRequestContext>, ProtocolError> {
146        self.apply_auth(request)?;
147        let Some(signer) = self.device_signer()? else {
148            return Ok(None);
149        };
150        let message_bytes = request.get_ref().encode_to_vec();
151        let ctx = request_signing::attach_pop(request, &signer, method_path, &message_bytes)?;
152        Ok(Some(ctx))
153    }
154
155    /// A human-tier rejection can only be satisfied if the original request was
156    /// PoP-signed (so we have a `SignedRequestContext` to derive the WebAuthn
157    /// challenge from). An anonymous client (no device key) that somehow reaches
158    /// a human-tier RPC has no context — surface a typed error, don't loop.
159    pub(in crate::grpc_hosted) fn require_human_sig_context(
160        &self,
161        ctx: Option<request_signing::SignedRequestContext>,
162    ) -> Result<request_signing::SignedRequestContext, ProtocolError> {
163        ctx.ok_or_else(|| {
164            ProtocolError::AuthorizationFailed(
165                "this action requires user verification, but the client has no device key to \
166                 sign the request; run `heddle auth login` first"
167                    .to_string(),
168            )
169        })
170    }
171
172    /// Invoke the app-registered human-signature callback over the pending
173    /// action. The WebAuthn challenge is client-derived
174    /// (`SHA256(canonical bytes)`) — no server round trip. If no callback is
175    /// registered, surface a typed error rather than looping.
176    pub(in crate::grpc_hosted) fn request_human_signature(
177        &self,
178        method_path: &str,
179        ctx: &request_signing::SignedRequestContext,
180        action_url: Option<String>,
181    ) -> Result<request_signing::WebAuthnAssertion, ProtocolError> {
182        let callback = self.on_human_signature.as_ref().ok_or_else(|| {
183            ProtocolError::AuthorizationFailed(format!(
184                "action {method_path} requires user verification, but no WebAuthn signer is \
185                 configured for this client"
186            ))
187        })?;
188        let challenge = request_signing::human_challenge(&ctx.canonical);
189        let req = request_signing::HumanSignatureRequest {
190            method_path: method_path.to_string(),
191            action_summary: format!("Authorize {method_path}"),
192            challenge,
193            canonical: ctx.canonical.clone(),
194            // Deep-link the server sent on the rejection (weft#338), if any — a display hint
195            // the callback can show; the signed challenge above is unaffected.
196            action_url,
197        };
198        callback(req)
199    }
200
201    pub(super) fn apply_auth<T>(&self, request: &mut Request<T>) -> Result<(), ProtocolError> {
202        if let Some(token) = &self.token_header {
203            request
204                .metadata_mut()
205                .insert("authorization", token.clone());
206            if let Some(pem) = &self.auth_proof_key_pem {
207                let signer = Ed25519Signer::from_pem(pem)
208                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
209                let raw = token
210                    .to_str()
211                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
212                let bearer = raw
213                    .strip_prefix("Bearer ")
214                    .or_else(|| raw.strip_prefix("bearer "))
215                    .unwrap_or(raw);
216                let proof_ts = std::time::SystemTime::now()
217                    .duration_since(std::time::UNIX_EPOCH)
218                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?
219                    .as_secs()
220                    .to_string();
221                let signature = signer
222                    .sign(format!("{bearer}|{proof_ts}").as_bytes())
223                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
224                use base64::Engine;
225                let encoded = base64::engine::general_purpose::STANDARD.encode(signature);
226                let proof = MetadataValue::try_from(encoded)
227                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
228                request.metadata_mut().insert("x-heddle-proof", proof);
229                let proof_ts = MetadataValue::try_from(proof_ts)
230                    .map_err(|err| ProtocolError::AuthenticationFailed(err.to_string()))?;
231                request.metadata_mut().insert("x-heddle-proof-ts", proof_ts);
232            }
233        }
234        Ok(())
235    }
236
237    /// Transparently rotate the credential for this client if it is near expiry.
238    ///
239    /// No-ops if `server_key` was not set on `ClientConfig` at construction
240    /// time, or if no credential is stored for the server, or if the token is
241    /// not within 10 minutes of expiry.
242    pub async fn auto_rotate_if_needed(&mut self) {
243        let server_key = match &self.server_key {
244            Some(k) => k.clone(),
245            None => return,
246        };
247        self.rotate_credential_for_server(&server_key).await;
248    }
249
250    async fn rotate_credential_for_server(&mut self, server_key: &str) {
251        // Load the stored credential.
252        let cred = match credentials::resolve_credential_for_server(server_key) {
253            Ok(Some(c)) => c,
254            Ok(None) => return,
255            Err(err) => {
256                tracing::warn!("credential rotation: failed to load credential: {err}");
257                return;
258            }
259        };
260
261        // Check whether the Biscuit's stored expiry is within the
262        // rotation window.
263        if !credentials::token_needs_rotation(&cred) {
264            return;
265        }
266
267        // We need both `credential_id` (the public key id the server
268        // will look up) and `private_key_pem` (to sign the renewal
269        // proof). Older credentials without one or the other can't
270        // self-renew; the user falls back to `heddle auth login`.
271        let public_key_id = match &cred.credential_id {
272            Some(id) => id.clone(),
273            None => {
274                tracing::debug!("credential rotation: no credential_id stored, skipping");
275                return;
276            }
277        };
278        let private_key_pem = match &cred.private_key_pem {
279            Some(pem) => pem.clone(),
280            None => {
281                tracing::debug!("credential rotation: no private_key_pem stored, skipping");
282                return;
283            }
284        };
285
286        // Sign the canonical renewal challenge:
287        //   "{timestamp}\n{public_key_id}\n{requested_scope}"
288        // Empty `requested_scope` == reuse the keypair owner's
289        // original scope. The server clamps anyway, so a permissive
290        // hint is fine.
291        let signer = match Ed25519Signer::from_pem(&private_key_pem) {
292            Ok(s) => s,
293            Err(err) => {
294                tracing::warn!("credential rotation: failed to load signing key: {err}");
295                return;
296            }
297        };
298        let timestamp = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
299            Ok(d) => d.as_secs(),
300            Err(err) => {
301                tracing::warn!("credential rotation: clock skew: {err}");
302                return;
303            }
304        };
305        let canonical = format!("{timestamp}\n{public_key_id}\n");
306        let signature = match signer.sign(canonical.as_bytes()) {
307            Ok(sig) => sig,
308            Err(err) => {
309                tracing::warn!("credential rotation: failed to sign challenge: {err}");
310                return;
311            }
312        };
313
314        let mut request = Request::new(MintBiscuitRequest {
315            subject: cred.subject.clone(),
316            requested_scope: String::new(),
317            user_agent: String::new(),
318            ip: String::new(),
319            proof: Some(Proof::Keypair(KeypairProof {
320                public_key_id,
321                timestamp,
322                signature,
323            })),
324            client_operation_id: String::new(),
325        });
326        // MintBiscuit is unauthenticated — the proof is the auth.
327        // We deliberately skip `apply_auth` here.
328        let _ = &mut request;
329
330        let response = match self.auth.mint_biscuit(request).await {
331            Ok(r) => r.into_inner(),
332            Err(status) => {
333                tracing::warn!(
334                    "credential rotation: MintBiscuit failed: {} — continuing with existing token",
335                    status.message()
336                );
337                return;
338            }
339        };
340
341        // Format the new expiry as RFC 3339.
342        let expires_at_secs = response
343            .expires_at
344            .as_ref()
345            .map(|t| t.seconds.max(0))
346            .unwrap_or(0);
347        let new_expires_at = if expires_at_secs > 0 {
348            chrono::DateTime::from_timestamp(expires_at_secs, 0)
349                .map(|dt| dt.to_rfc3339())
350                .unwrap_or_else(|| expires_at_secs.to_string())
351        } else {
352            String::new()
353        };
354
355        tracing::debug!(
356            "credential rotation: rotated successfully, new expiry: {}",
357            new_expires_at
358        );
359
360        // Persist the updated credential. The keypair stays the
361        // same — that's the whole point of the keypair-based renewal
362        // model. We replace `token` (the Biscuit) and bump
363        // `expires_at` to the fresh window.
364        let updated = credentials::ServerCredential {
365            token: response.token.clone(),
366            subject: if response.subject.is_empty() {
367                cred.subject.clone()
368            } else {
369                response.subject
370            },
371            device_id: cred.device_id.clone(),
372            credential_id: cred.credential_id.clone(),
373            private_key_pem: Some(private_key_pem),
374            expires_at: if new_expires_at.is_empty() {
375                cred.expires_at.clone()
376            } else {
377                Some(new_expires_at)
378            },
379        };
380
381        if let Err(err) = credentials::store_server_credential(server_key, updated) {
382            tracing::warn!("credential rotation: failed to persist updated credential: {err}");
383            // Don't bail — the in-memory update below still improves the session.
384        }
385
386        // Update the in-memory token header so the remaining RPCs on this
387        // client instance use the fresh token.
388        match MetadataValue::try_from(format!("Bearer {}", response.token)) {
389            Ok(header) => self.token_header = Some(header),
390            Err(err) => {
391                tracing::warn!("credential rotation: failed to set new token header: {err}");
392            }
393        }
394    }
395
396    pub(super) async fn sync_remote_markers(
397        &mut self,
398        repo: &Repository,
399        repo_path: &str,
400        pushed_state: objects::object::ChangeId,
401    ) -> Result<(), ProtocolError> {
402        let remote_markers = self
403            .list_refs(repo_path)
404            .await?
405            .into_iter()
406            .filter(|entry| !entry.is_thread)
407            .map(|entry| (entry.name, entry.change_id))
408            .collect::<std::collections::HashMap<_, _>>();
409        for marker in repo.refs().list_markers()? {
410            let Some(change_id) = repo.refs().get_marker(&marker)? else {
411                continue;
412            };
413            if !wire::is_ancestor(repo.store(), change_id, pushed_state)? {
414                continue;
415            }
416
417            let old_value = remote_markers.get(marker.as_str()).copied();
418            if old_value == Some(change_id) {
419                continue;
420            }
421
422            let result = self
423                .update_ref(repo_path, &marker, false, old_value, change_id, true, None)
424                .await?;
425            if !result.success {
426                return Err(ProtocolError::InvalidState(
427                    result
428                        .error
429                        .unwrap_or_else(|| format!("failed to sync marker '{marker}'")),
430                ));
431            }
432        }
433        Ok(())
434    }
435
436    pub(super) async fn sync_local_markers(
437        &mut self,
438        repo: &Repository,
439        repo_path: &str,
440    ) -> Result<(), ProtocolError> {
441        let remote_markers = self.list_refs(repo_path).await?;
442        for marker in remote_markers.into_iter().filter(|entry| !entry.is_thread) {
443            if !repo.store().has_state(&marker.change_id)? {
444                continue;
445            }
446            let marker_name = MarkerName::from(marker.name.as_str());
447            match repo.refs().get_marker(&marker_name)? {
448                Some(existing) if existing == marker.change_id => {}
449                Some(existing) => repo.refs().set_marker_cas(
450                    &marker_name,
451                    refs::RefExpectation::Value(existing),
452                    &marker.change_id,
453                )?,
454                None => repo.refs().create_marker(&marker_name, &marker.change_id)?,
455            }
456        }
457        Ok(())
458    }
459}
460
461pub use hydration::{LazyHostedHydrator, PullMaterialization, register_hosted_factory};
462pub use session::{HostedAuthMode, HostedSession};
463pub use sync::HostedRefEntry;