1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::sync::{Arc, Mutex, RwLock};
3use std::time::Duration as StdDuration;
4
5use async_trait::async_trait;
6use reqwest::StatusCode;
7use serde::Deserialize;
8use serde_json::{json, Map as JsonMap, Value as JsonValue};
9use time::{Duration, OffsetDateTime};
10use tokio::sync::watch;
11use tokio::task::JoinHandle;
12
13use crate::connectors::{
14 ActivationHandle, ClientError, Connector, ConnectorClient, ConnectorCtx, ConnectorError,
15 HmacSignatureStyle, ProviderPayloadSchema, RawInbound, TriggerBinding, TriggerKind,
16};
17use crate::secrets::{SecretId, SecretVersion};
18use crate::triggers::{
19 redact_headers, HeaderRedactionPolicy, ProviderId, ProviderPayload, SignatureStatus, TraceId,
20 TriggerEvent, TriggerEventId,
21};
22
23#[cfg(test)]
24mod tests;
25
26pub const LINEAR_PROVIDER_ID: &str = "linear";
27const DEFAULT_API_BASE_URL: &str = "https://api.linear.app/graphql";
28const DEFAULT_REPLAY_WINDOW_SECS: i64 = 60;
29const DEFAULT_REPLAY_GRACE_SECS: i64 = 15;
30const DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS: u64 = 60;
31const DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD: u32 = 5;
32const COMPLEXITY_WARNING_THRESHOLD: i64 = 5_000;
33
34pub struct LinearConnector {
35 provider_id: ProviderId,
36 kinds: Vec<TriggerKind>,
37 state: Arc<LinearConnectorState>,
38 client: Arc<LinearClient>,
39}
40
41#[derive(Default)]
42struct LinearConnectorState {
43 ctx: RwLock<Option<ConnectorCtx>>,
44 bindings: RwLock<HashMap<String, ActivatedLinearBinding>>,
45 monitor_tasks: Mutex<Vec<JoinHandle<()>>>,
46 monitor_shutdown: Mutex<Option<watch::Sender<bool>>>,
47}
48
49#[derive(Clone, Debug)]
50struct ActivatedLinearBinding {
51 #[allow(dead_code)]
52 binding_id: String,
53 path: Option<String>,
54 signing_secret: SecretId,
55 replay_grace_secs: i64,
56 monitor: Option<LinearWebhookMonitor>,
57}
58
59struct LinearClient {
60 provider_id: ProviderId,
61 state: Arc<LinearConnectorState>,
62 http: reqwest::Client,
63}
64
65#[derive(Debug, Default, Deserialize)]
66struct LinearBindingConfig {
67 #[serde(default, rename = "match")]
68 match_config: LinearMatchConfig,
69 #[serde(default)]
70 secrets: LinearSecretsConfig,
71 #[serde(default)]
72 security: LinearSecurityConfig,
73 #[serde(default)]
74 replay_grace_secs: Option<i64>,
75 #[serde(default)]
76 monitor: Option<LinearWebhookMonitorConfig>,
77}
78
79#[derive(Debug, Default, Deserialize)]
80struct LinearMatchConfig {
81 path: Option<String>,
82}
83
84#[derive(Clone, Debug, Default, Deserialize)]
85struct LinearSecretsConfig {
86 signing_secret: Option<String>,
87 access_token: Option<String>,
88 api_key: Option<String>,
89}
90
91#[derive(Debug, Default, Deserialize)]
92struct LinearSecurityConfig {
93 replay_grace_secs: Option<i64>,
94 timestamp_grace_secs: Option<i64>,
95}
96
97#[derive(Clone, Debug, Default, Deserialize)]
98struct LinearClientConfigArgs {
99 api_base_url: Option<String>,
100 api_key: Option<String>,
101 api_key_secret: Option<String>,
102 access_token: Option<String>,
103 access_token_secret: Option<String>,
104 #[serde(default)]
105 secrets: LinearSecretsConfig,
106}
107
108#[derive(Clone, Debug, Default, Deserialize)]
109struct LinearWebhookMonitorConfig {
110 #[serde(default)]
111 enabled: Option<bool>,
112 webhook_id: Option<String>,
113 health_url: Option<String>,
114 #[serde(default)]
115 probe_interval_secs: Option<u64>,
116 #[serde(default)]
117 probe_interval_ms: Option<u64>,
118 #[serde(default)]
119 success_threshold: Option<u32>,
120 #[serde(flatten)]
121 client: LinearClientConfigArgs,
122}
123
124#[derive(Clone, Debug)]
125struct ResolvedLinearClientConfig {
126 api_base_url: String,
127 auth: LinearAuthSource,
128}
129
130#[derive(Clone, Debug)]
131enum LinearAuthSource {
132 ApiKeyInline(String),
133 ApiKeySecret(SecretId),
134 AccessTokenInline(String),
135 AccessTokenSecret(SecretId),
136}
137
138#[derive(Clone, Debug)]
139struct LinearWebhookMonitor {
140 webhook_id: String,
141 health_url: String,
142 probe_interval: StdDuration,
143 success_threshold: u32,
144 client_config: ResolvedLinearClientConfig,
145}
146
147#[derive(Debug, Deserialize)]
148struct ListIssuesArgs {
149 #[serde(flatten)]
150 config: LinearClientConfigArgs,
151 #[serde(default)]
152 filter: Option<JsonValue>,
153 #[serde(default)]
154 first: Option<i64>,
155 #[serde(default)]
156 after: Option<String>,
157 #[serde(default)]
158 include_archived: Option<bool>,
159}
160
161#[derive(Debug, Deserialize)]
162struct UpdateIssueArgs {
163 #[serde(flatten)]
164 config: LinearClientConfigArgs,
165 id: String,
166 changes: JsonValue,
167}
168
169#[derive(Debug, Deserialize)]
170struct CreateCommentArgs {
171 #[serde(flatten)]
172 config: LinearClientConfigArgs,
173 issue_id: String,
174 body: String,
175}
176
177#[derive(Debug, Deserialize)]
178struct SearchArgs {
179 #[serde(flatten)]
180 config: LinearClientConfigArgs,
181 query: String,
182 #[serde(default)]
183 first: Option<i64>,
184}
185
186#[derive(Debug, Deserialize)]
187struct GraphqlArgs {
188 #[serde(flatten)]
189 config: LinearClientConfigArgs,
190 query: String,
191 #[serde(default)]
192 variables: Option<JsonValue>,
193 #[serde(default)]
194 operation_name: Option<String>,
195}
196
197impl LinearConnector {
198 pub fn new() -> Self {
199 let state = Arc::new(LinearConnectorState::default());
200 let client = Arc::new(LinearClient {
201 provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
202 state: state.clone(),
203 http: crate::connectors::outbound_http_client("harn-linear-connector"),
204 });
205 Self {
206 provider_id: ProviderId::from(LINEAR_PROVIDER_ID),
207 kinds: vec![TriggerKind::from("webhook")],
208 state,
209 client,
210 }
211 }
212
213 fn binding_for_raw(&self, raw: &RawInbound) -> Result<ActivatedLinearBinding, ConnectorError> {
214 let bindings = self
215 .state
216 .bindings
217 .read()
218 .expect("linear connector bindings poisoned");
219 if let Some(binding_id) = raw.metadata.get("binding_id").and_then(JsonValue::as_str) {
220 return bindings.get(binding_id).cloned().ok_or_else(|| {
221 ConnectorError::Unsupported(format!(
222 "linear connector has no active binding `{binding_id}`"
223 ))
224 });
225 }
226 if bindings.len() == 1 {
227 return bindings
228 .values()
229 .next()
230 .cloned()
231 .ok_or_else(|| ConnectorError::Activation("linear bindings missing".to_string()));
232 }
233 Err(ConnectorError::Unsupported(
234 "linear connector requires raw.metadata.binding_id when multiple bindings are active"
235 .to_string(),
236 ))
237 }
238
239 fn ctx(&self) -> Result<ConnectorCtx, ConnectorError> {
240 self.state
241 .ctx
242 .read()
243 .expect("linear connector ctx poisoned")
244 .clone()
245 .ok_or_else(|| {
246 ConnectorError::Activation(
247 "linear connector must be initialized before use".to_string(),
248 )
249 })
250 }
251
252 fn stop_monitors(&self) {
253 if let Some(shutdown) = self
254 .state
255 .monitor_shutdown
256 .lock()
257 .expect("linear connector monitor shutdown poisoned")
258 .take()
259 {
260 let _ = shutdown.send(true);
261 }
262 let mut tasks = self
263 .state
264 .monitor_tasks
265 .lock()
266 .expect("linear connector monitor tasks poisoned");
267 for task in tasks.drain(..) {
268 task.abort();
269 }
270 }
271}
272
273impl Default for LinearConnector {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279impl Drop for LinearConnector {
280 fn drop(&mut self) {
281 self.stop_monitors();
282 }
283}
284
285#[async_trait]
286impl Connector for LinearConnector {
287 fn provider_id(&self) -> &ProviderId {
288 &self.provider_id
289 }
290
291 fn kinds(&self) -> &[TriggerKind] {
292 &self.kinds
293 }
294
295 async fn init(&mut self, ctx: ConnectorCtx) -> Result<(), ConnectorError> {
296 *self
297 .state
298 .ctx
299 .write()
300 .expect("linear connector ctx poisoned") = Some(ctx);
301 Ok(())
302 }
303
304 async fn activate(
305 &self,
306 bindings: &[TriggerBinding],
307 ) -> Result<ActivationHandle, ConnectorError> {
308 self.stop_monitors();
309 let mut configured = HashMap::new();
310 let mut paths = BTreeSet::new();
311 for binding in bindings {
312 let activated = ActivatedLinearBinding::from_binding(binding)?;
313 if let Some(path) = &activated.path {
314 if !paths.insert(path.clone()) {
315 return Err(ConnectorError::Activation(format!(
316 "linear connector path `{path}` is configured by multiple bindings"
317 )));
318 }
319 }
320 configured.insert(binding.binding_id.clone(), activated);
321 }
322 let (monitor_shutdown, _) = watch::channel(false);
323 *self
324 .state
325 .bindings
326 .write()
327 .expect("linear connector bindings poisoned") = configured;
328 *self
329 .state
330 .monitor_shutdown
331 .lock()
332 .expect("linear connector monitor shutdown poisoned") = Some(monitor_shutdown.clone());
333 let tasks = {
334 let bindings = self
335 .state
336 .bindings
337 .read()
338 .expect("linear connector bindings poisoned");
339 bindings
340 .values()
341 .filter_map(|binding| {
342 binding.monitor.clone().map(|monitor| {
343 let client = self.client.clone();
344 let shutdown = monitor_shutdown.subscribe();
345 tokio::spawn(async move {
346 run_webhook_monitor(client, monitor, shutdown).await;
347 })
348 })
349 })
350 .collect::<Vec<_>>()
351 };
352 *self
353 .state
354 .monitor_tasks
355 .lock()
356 .expect("linear connector monitor tasks poisoned") = tasks;
357 Ok(ActivationHandle::new(
358 self.provider_id.clone(),
359 bindings.len(),
360 ))
361 }
362
363 async fn shutdown(&self, deadline: StdDuration) -> Result<(), ConnectorError> {
364 if let Some(shutdown) = self
365 .state
366 .monitor_shutdown
367 .lock()
368 .expect("linear connector monitor shutdown poisoned")
369 .take()
370 {
371 let _ = shutdown.send(true);
372 }
373 let pending = self
374 .state
375 .monitor_tasks
376 .lock()
377 .expect("linear connector monitor tasks poisoned")
378 .drain(..)
379 .collect::<Vec<_>>();
380 if pending.is_empty() {
381 return Ok(());
382 }
383 let wait_all = async {
384 for task in pending {
385 let _ = task.await;
386 }
387 };
388 tokio::time::timeout(deadline, wait_all)
389 .await
390 .map_err(|_| {
391 ConnectorError::Activation(format!(
392 "linear connector shutdown exceeded {}s",
393 deadline.as_secs()
394 ))
395 })?;
396 Ok(())
397 }
398
399 async fn normalize_inbound(&self, raw: RawInbound) -> Result<TriggerEvent, ConnectorError> {
400 let ctx = self.ctx()?;
401 let binding = self.binding_for_raw(&raw)?;
402 let provider = self.provider_id.clone();
403 let received_at = raw.received_at;
404 let headers = effective_headers(&raw.headers);
405 let secret = load_secret_text_blocking(&ctx, &binding.signing_secret)?;
406 let timestamp_window =
407 Duration::seconds(DEFAULT_REPLAY_WINDOW_SECS + binding.replay_grace_secs.max(0));
408 let verify = futures::executor::block_on(crate::connectors::hmac::verify_hmac_signed(
409 ctx.event_log.as_ref(),
410 &provider,
411 HmacSignatureStyle::linear(),
412 &raw.body,
413 &headers,
414 secret.as_str(),
415 Some(timestamp_window),
416 received_at,
417 ));
418 if let Err(error) = verify {
419 if matches!(error, ConnectorError::TimestampOutOfWindow { .. }) {
420 ctx.metrics.record_linear_timestamp_rejection();
421 }
422 return Err(error);
423 }
424
425 let payload = raw.json_body()?;
426 let dedupe_key = header_value(&headers, "linear-delivery")
427 .map(ToString::to_string)
428 .or_else(|| {
429 payload
430 .get("webhookId")
431 .and_then(JsonValue::as_str)
432 .map(ToString::to_string)
433 })
434 .unwrap_or_else(|| fallback_body_digest(&raw.body));
435 let kind = linear_trigger_kind(&payload);
436 let provider_payload = ProviderPayload::normalize(&provider, &kind, &headers, payload)
437 .map_err(|error| ConnectorError::Unsupported(error.to_string()))?;
438 Ok(TriggerEvent {
439 id: TriggerEventId::new(),
440 provider,
441 kind,
442 received_at,
443 occurred_at: raw
444 .occurred_at
445 .or_else(|| infer_occurred_at(&provider_payload)),
446 dedupe_key,
447 trace_id: TraceId::new(),
448 tenant_id: raw.tenant_id.clone(),
449 headers: redact_headers(&headers, &HeaderRedactionPolicy::default()),
450 batch: None,
451 raw_body: Some(raw.body.clone()),
452 provider_payload,
453 signature_status: SignatureStatus::Verified,
454 dedupe_claimed: false,
455 })
456 }
457
458 fn payload_schema(&self) -> ProviderPayloadSchema {
459 ProviderPayloadSchema::named("LinearEventPayload")
460 }
461
462 fn client(&self) -> Arc<dyn ConnectorClient> {
463 self.client.clone()
464 }
465}
466
467#[async_trait]
468impl ConnectorClient for LinearClient {
469 async fn call(&self, method: &str, args: JsonValue) -> Result<JsonValue, ClientError> {
470 match method {
471 "list_issues" => {
472 let args: ListIssuesArgs = parse_args(args)?;
473 let config = self.resolve_client_config(&args.config)?;
474 let envelope = self
475 .request_graphql(
476 &config,
477 "query ListIssues($filter: IssueFilter, $first: Int, $after: String, $includeArchived: Boolean) { issues(filter: $filter, first: $first, after: $after, includeArchived: $includeArchived) { nodes { id identifier title priority estimate dueDate url createdAt updatedAt state { id name type } team { id key name } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } pageInfo { hasNextPage endCursor } } }",
478 json!({
479 "filter": args.filter,
480 "first": args.first,
481 "after": args.after,
482 "includeArchived": args.include_archived,
483 }),
484 Some("ListIssues"),
485 )
486 .await?;
487 extract_graphql_field(envelope, "issues")
488 }
489 "update_issue" => {
490 let args: UpdateIssueArgs = parse_args(args)?;
491 let config = self.resolve_client_config(&args.config)?;
492 let envelope = self
493 .request_graphql(
494 &config,
495 "mutation UpdateIssue($id: String!, $input: IssueUpdateInput!) { issueUpdate(id: $id, input: $input) { success issue { id identifier title priority estimate dueDate url updatedAt state { id name type } assignee { id name } project { id name } cycle { id name } labels { nodes { id name } } } } }",
496 json!({
497 "id": args.id,
498 "input": args.changes,
499 }),
500 Some("UpdateIssue"),
501 )
502 .await?;
503 extract_graphql_field(envelope, "issueUpdate")
504 }
505 "create_comment" => {
506 let args: CreateCommentArgs = parse_args(args)?;
507 let config = self.resolve_client_config(&args.config)?;
508 let envelope = self
509 .request_graphql(
510 &config,
511 "mutation CreateComment($input: CommentCreateInput!) { commentCreate(input: $input) { success comment { id body url createdAt user { id name } issue { id identifier title } } } }",
512 json!({
513 "input": {
514 "issueId": args.issue_id,
515 "body": args.body,
516 }
517 }),
518 Some("CreateComment"),
519 )
520 .await?;
521 extract_graphql_field(envelope, "commentCreate")
522 }
523 "search" => {
524 let args: SearchArgs = parse_args(args)?;
525 let config = self.resolve_client_config(&args.config)?;
526 self.search_issues(&config, &args.query, args.first).await
527 }
528 "graphql" => {
529 let args: GraphqlArgs = parse_args(args)?;
530 let config = self.resolve_client_config(&args.config)?;
531 self.request_graphql(
532 &config,
533 &args.query,
534 args.variables.unwrap_or(JsonValue::Null),
535 args.operation_name.as_deref(),
536 )
537 .await
538 }
539 other => Err(ClientError::MethodNotFound(format!(
540 "linear connector does not implement outbound method `{other}`"
541 ))),
542 }
543 }
544}
545
546impl LinearClient {
547 fn resolve_client_config(
548 &self,
549 args: &LinearClientConfigArgs,
550 ) -> Result<ResolvedLinearClientConfig, ClientError> {
551 resolve_client_config_args(args).map_err(ClientError::InvalidArgs)
552 }
553
554 fn ctx(&self) -> Result<ConnectorCtx, ClientError> {
555 self.state
556 .ctx
557 .read()
558 .expect("linear connector ctx poisoned")
559 .clone()
560 .ok_or_else(|| ClientError::Other("linear connector must be initialized".to_string()))
561 }
562
563 async fn request_graphql(
564 &self,
565 config: &ResolvedLinearClientConfig,
566 query: &str,
567 variables: JsonValue,
568 operation_name: Option<&str>,
569 ) -> Result<JsonValue, ClientError> {
570 let ctx = self.ctx()?;
571 ctx.rate_limiter
572 .scoped(&self.provider_id, "graphql")
573 .acquire()
574 .await;
575
576 let auth = self.auth_header(config).await?;
577 let mut request_body = JsonMap::new();
578 request_body.insert("query".to_string(), JsonValue::String(query.to_string()));
579 if !variables.is_null() {
580 request_body.insert("variables".to_string(), variables);
581 }
582 if let Some(operation_name) = operation_name {
583 request_body.insert(
584 "operationName".to_string(),
585 JsonValue::String(operation_name.to_string()),
586 );
587 }
588
589 let response = self
590 .http
591 .post(config.api_base_url.clone())
592 .header("Content-Type", "application/json")
593 .header("Authorization", auth)
594 .json(&JsonValue::Object(request_body))
595 .send()
596 .await
597 .map_err(|error| ClientError::Transport(error.to_string()))?;
598
599 let status = response.status();
600 let meta = graphql_meta(response.headers(), query);
601 let payload = response
602 .json::<JsonValue>()
603 .await
604 .map_err(|error| ClientError::Transport(error.to_string()))?;
605 let errors = payload.get("errors").cloned();
606 if is_rate_limited(status, errors.as_ref()) {
607 return Err(ClientError::RateLimited(graphql_error_message(
608 status,
609 errors.as_ref(),
610 )));
611 }
612 if !status.is_success() {
613 return Err(ClientError::Transport(graphql_error_message(
614 status,
615 errors.as_ref(),
616 )));
617 }
618
619 Ok(json!({
620 "data": payload.get("data").cloned().unwrap_or(JsonValue::Null),
621 "errors": errors.unwrap_or(JsonValue::Null),
622 "meta": meta,
623 }))
624 }
625
626 async fn auth_header(
627 &self,
628 config: &ResolvedLinearClientConfig,
629 ) -> Result<String, ClientError> {
630 match &config.auth {
631 LinearAuthSource::ApiKeyInline(value) => Ok(value.clone()),
632 LinearAuthSource::AccessTokenInline(value) => Ok(format!("Bearer {value}")),
633 LinearAuthSource::ApiKeySecret(secret_id) => {
634 let secret = self.secret_text(secret_id).await?;
635 Ok(secret)
636 }
637 LinearAuthSource::AccessTokenSecret(secret_id) => {
638 let secret = self.secret_text(secret_id).await?;
639 Ok(format!("Bearer {secret}"))
640 }
641 }
642 }
643
644 async fn secret_text(&self, secret_id: &SecretId) -> Result<String, ClientError> {
645 let ctx = self.ctx()?;
646 let secret = ctx
647 .secrets
648 .get(secret_id)
649 .await
650 .map_err(|error| ClientError::Other(error.to_string()))?;
651 Ok(secret.with_exposed(|bytes| String::from_utf8_lossy(bytes).to_string()))
652 }
653
654 async fn search_issues(
655 &self,
656 config: &ResolvedLinearClientConfig,
657 query: &str,
658 first: Option<i64>,
659 ) -> Result<JsonValue, ClientError> {
660 let searches = [
661 (
662 "query SearchIssues($query: String!, $first: Int) { searchIssues(query: $query, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
663 "SearchIssues",
664 ),
665 (
666 "query SearchIssues($term: String!, $first: Int) { searchIssues(term: $term, first: $first) { nodes { id identifier title url priority state { id name type } team { id key name } } } }",
667 "SearchIssues",
668 ),
669 ];
670
671 for (index, (document, operation_name)) in searches.into_iter().enumerate() {
672 let variables = if document.contains("$query") {
673 json!({ "query": query, "first": first })
674 } else {
675 json!({ "term": query, "first": first })
676 };
677 match self
678 .request_graphql(config, document, variables, Some(operation_name))
679 .await
680 {
681 Ok(envelope) => return extract_graphql_field(envelope, "searchIssues"),
682 Err(ClientError::Transport(message))
683 if index == 0
684 && (message.contains("Unknown argument")
685 || message.contains("GRAPHQL_VALIDATION_FAILED")) => {}
686 Err(error) => return Err(error),
687 }
688 }
689
690 Err(ClientError::Transport(
691 "linear connector searchIssues query failed".to_string(),
692 ))
693 }
694
695 async fn probe_health(&self, health_url: &str) -> Result<bool, ClientError> {
696 let response = self
697 .http
698 .get(health_url)
699 .send()
700 .await
701 .map_err(|error| ClientError::Transport(error.to_string()))?;
702 Ok(response.status() == StatusCode::OK)
703 }
704
705 async fn reenable_webhook(
706 &self,
707 config: &ResolvedLinearClientConfig,
708 webhook_id: &str,
709 ) -> Result<(), ClientError> {
710 let envelope = self
711 .request_graphql(
712 config,
713 "mutation ReenableWebhook($id: String!, $input: WebhookUpdateInput!) { webhookUpdate(id: $id, input: $input) { success webhook { id enabled } } }",
714 json!({
715 "id": webhook_id,
716 "input": { "enabled": true },
717 }),
718 Some("ReenableWebhook"),
719 )
720 .await?;
721 let payload = envelope
722 .get("data")
723 .and_then(|value| value.get("webhookUpdate"))
724 .ok_or_else(|| {
725 ClientError::Transport(
726 "linear connector response missing `webhookUpdate` GraphQL field".to_string(),
727 )
728 })?;
729 if payload
730 .get("success")
731 .and_then(JsonValue::as_bool)
732 .unwrap_or(false)
733 {
734 Ok(())
735 } else {
736 Err(ClientError::Transport(
737 "linear connector webhook re-enable returned success = false".to_string(),
738 ))
739 }
740 }
741}
742
743impl ActivatedLinearBinding {
744 fn from_binding(binding: &TriggerBinding) -> Result<Self, ConnectorError> {
745 let config: LinearBindingConfig =
746 serde_json::from_value(binding.config.clone()).map_err(|error| {
747 ConnectorError::Activation(format!(
748 "linear binding `{}` has invalid config: {error}",
749 binding.binding_id
750 ))
751 })?;
752 let signing_secret =
753 parse_secret_id(config.secrets.signing_secret.as_deref()).ok_or_else(|| {
754 ConnectorError::Activation(format!(
755 "linear binding `{}` requires secrets.signing_secret",
756 binding.binding_id
757 ))
758 })?;
759 let replay_grace_secs = config
760 .replay_grace_secs
761 .or(config.security.replay_grace_secs)
762 .or(config.security.timestamp_grace_secs)
763 .unwrap_or(DEFAULT_REPLAY_GRACE_SECS);
764 let monitor = config
765 .monitor
766 .map(|monitor| {
767 LinearWebhookMonitor::from_config(&binding.binding_id, monitor, &config.secrets)
768 })
769 .transpose()
770 .map_err(ConnectorError::Activation)?
771 .flatten();
772 Ok(Self {
773 binding_id: binding.binding_id.clone(),
774 path: config.match_config.path,
775 signing_secret,
776 replay_grace_secs,
777 monitor,
778 })
779 }
780}
781
782impl LinearWebhookMonitor {
783 fn from_config(
784 binding_id: &str,
785 config: LinearWebhookMonitorConfig,
786 binding_secrets: &LinearSecretsConfig,
787 ) -> Result<Option<Self>, String> {
788 if config.enabled == Some(false) {
789 return Ok(None);
790 }
791 let webhook_id = config
792 .webhook_id
793 .filter(|value| !value.trim().is_empty())
794 .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires webhook_id"))?;
795 let health_url = config
796 .health_url
797 .filter(|value| !value.trim().is_empty())
798 .ok_or_else(|| format!("linear binding `{binding_id}` monitor requires health_url"))?;
799 let mut client = config.client;
800 if client.access_token.is_none()
801 && client.access_token_secret.is_none()
802 && client.api_key.is_none()
803 && client.api_key_secret.is_none()
804 && client.secrets.access_token.is_none()
805 && client.secrets.api_key.is_none()
806 {
807 client.secrets.access_token = binding_secrets.access_token.clone();
808 client.secrets.api_key = binding_secrets.api_key.clone();
809 }
810 let client_config = resolve_client_config_args(&client).map_err(|error| {
811 format!("linear binding `{binding_id}` monitor auth is invalid: {error}")
812 })?;
813 Ok(Some(Self {
814 webhook_id,
815 health_url,
816 probe_interval: config
817 .probe_interval_ms
818 .map(|ms| StdDuration::from_millis(ms.max(1)))
819 .unwrap_or_else(|| {
820 StdDuration::from_secs(
821 config
822 .probe_interval_secs
823 .unwrap_or(DEFAULT_WEBHOOK_MONITOR_PROBE_INTERVAL_SECS)
824 .max(1),
825 )
826 }),
827 success_threshold: config
828 .success_threshold
829 .unwrap_or(DEFAULT_WEBHOOK_MONITOR_SUCCESS_THRESHOLD)
830 .max(1),
831 client_config,
832 }))
833 }
834}
835
836async fn run_webhook_monitor(
837 client: Arc<LinearClient>,
838 monitor: LinearWebhookMonitor,
839 mut shutdown: watch::Receiver<bool>,
840) {
841 let mut consecutive_successes = 0u32;
842 loop {
843 tokio::select! {
844 changed = shutdown.changed() => {
845 if changed.is_err() || *shutdown.borrow() {
846 break;
847 }
848 }
849 _ = tokio::time::sleep(monitor.probe_interval) => {}
850 }
851 if *shutdown.borrow() {
852 break;
853 }
854 match client.probe_health(&monitor.health_url).await {
855 Ok(true) => {
856 consecutive_successes = consecutive_successes.saturating_add(1);
857 if consecutive_successes >= monitor.success_threshold
858 && client
859 .reenable_webhook(&monitor.client_config, &monitor.webhook_id)
860 .await
861 .is_ok()
862 {
863 consecutive_successes = 0;
864 }
865 }
866 Ok(false) | Err(_) => {
867 consecutive_successes = 0;
868 }
869 }
870 }
871}
872
873fn resolve_client_config_args(
874 args: &LinearClientConfigArgs,
875) -> Result<ResolvedLinearClientConfig, String> {
876 let auth = if let Some(secret_id) = args
877 .access_token_secret
878 .as_deref()
879 .or(args.secrets.access_token.as_deref())
880 .and_then(|value| parse_secret_id(Some(value)))
881 {
882 LinearAuthSource::AccessTokenSecret(secret_id)
883 } else if let Some(secret_id) = args
884 .api_key_secret
885 .as_deref()
886 .or(args.secrets.api_key.as_deref())
887 .and_then(|value| parse_secret_id(Some(value)))
888 {
889 LinearAuthSource::ApiKeySecret(secret_id)
890 } else if let Some(token) = args.access_token.clone() {
891 LinearAuthSource::AccessTokenInline(token)
892 } else if let Some(api_key) = args.api_key.clone() {
893 LinearAuthSource::ApiKeyInline(api_key)
894 } else {
895 return Err(
896 "linear connector requires access_token, access_token_secret, api_key, or api_key_secret"
897 .to_string(),
898 );
899 };
900
901 Ok(ResolvedLinearClientConfig {
902 api_base_url: args
903 .api_base_url
904 .clone()
905 .unwrap_or_else(|| DEFAULT_API_BASE_URL.to_string()),
906 auth,
907 })
908}
909
910fn effective_headers(headers: &BTreeMap<String, String>) -> BTreeMap<String, String> {
911 let mut effective = headers.clone();
912 for (raw, canonical) in [
913 ("content-type", "Content-Type"),
914 ("linear-signature", "Linear-Signature"),
915 ("linear-delivery", "Linear-Delivery"),
916 ("linear-event", "Linear-Event"),
917 ] {
918 if let Some(value) = header_value(headers, raw) {
919 effective
920 .entry(canonical.to_string())
921 .or_insert_with(|| value.to_string());
922 }
923 }
924 effective
925}
926
927fn linear_trigger_kind(payload: &JsonValue) -> String {
928 let event = payload
929 .get("type")
930 .and_then(JsonValue::as_str)
931 .map(|kind| {
932 let lower = kind.to_ascii_lowercase();
933 match lower.as_str() {
934 "issue" => "issue".to_string(),
935 "comment" | "issuecomment" | "issue_comment" => "comment".to_string(),
936 "issuelabel" | "issue_label" => "issue_label".to_string(),
937 "project" | "projectupdate" | "project_update" => "project".to_string(),
938 "cycle" => "cycle".to_string(),
939 "customer" => "customer".to_string(),
940 "customerrequest" | "customer_request" => "customer_request".to_string(),
941 _ => lower,
942 }
943 })
944 .unwrap_or_else(|| "other".to_string());
945 let action = payload
946 .get("action")
947 .and_then(JsonValue::as_str)
948 .unwrap_or("update");
949 format!("{event}.{action}")
950}
951
952fn infer_occurred_at(payload: &ProviderPayload) -> Option<OffsetDateTime> {
953 let ProviderPayload::Known(crate::triggers::event::KnownProviderPayload::Linear(payload)) =
954 payload
955 else {
956 return None;
957 };
958 let timestamp = match payload {
959 crate::triggers::event::LinearEventPayload::Issue(payload) => {
960 payload.common.webhook_timestamp
961 }
962 crate::triggers::event::LinearEventPayload::IssueComment(payload) => {
963 payload.common.webhook_timestamp
964 }
965 crate::triggers::event::LinearEventPayload::IssueLabel(payload) => {
966 payload.common.webhook_timestamp
967 }
968 crate::triggers::event::LinearEventPayload::Project(payload) => {
969 payload.common.webhook_timestamp
970 }
971 crate::triggers::event::LinearEventPayload::Cycle(payload) => {
972 payload.common.webhook_timestamp
973 }
974 crate::triggers::event::LinearEventPayload::Customer(payload) => {
975 payload.common.webhook_timestamp
976 }
977 crate::triggers::event::LinearEventPayload::CustomerRequest(payload) => {
978 payload.common.webhook_timestamp
979 }
980 crate::triggers::event::LinearEventPayload::Other(payload) => payload.webhook_timestamp,
981 }?;
982 OffsetDateTime::from_unix_timestamp_nanos(i128::from(timestamp) * 1_000_000).ok()
983}
984
985fn parse_args<T: for<'de> Deserialize<'de>>(args: JsonValue) -> Result<T, ClientError> {
986 serde_json::from_value(args).map_err(|error| ClientError::InvalidArgs(error.to_string()))
987}
988
989fn load_secret_text_blocking(
990 ctx: &ConnectorCtx,
991 secret_id: &SecretId,
992) -> Result<String, ConnectorError> {
993 let secret = futures::executor::block_on(ctx.secrets.get(secret_id))?;
994 secret.with_exposed(|bytes| {
995 std::str::from_utf8(bytes)
996 .map(|value| value.to_string())
997 .map_err(|error| {
998 ConnectorError::Secret(format!(
999 "secret '{}' is not valid UTF-8: {error}",
1000 secret_id
1001 ))
1002 })
1003 })
1004}
1005
1006fn parse_secret_id(raw: Option<&str>) -> Option<SecretId> {
1007 let trimmed = raw?.trim();
1008 if trimmed.is_empty() {
1009 return None;
1010 }
1011 let (base, version) = match trimmed.rsplit_once('@') {
1012 Some((base, version_text)) => {
1013 let version = version_text.parse::<u64>().ok()?;
1014 (base, SecretVersion::Exact(version))
1015 }
1016 None => (trimmed, SecretVersion::Latest),
1017 };
1018 let (namespace, name) = base.split_once('/')?;
1019 if namespace.is_empty() || name.is_empty() {
1020 return None;
1021 }
1022 Some(SecretId::new(namespace, name).with_version(version))
1023}
1024
1025fn header_value<'a>(headers: &'a BTreeMap<String, String>, name: &str) -> Option<&'a str> {
1026 headers
1027 .iter()
1028 .find(|(key, _)| key.eq_ignore_ascii_case(name))
1029 .map(|(_, value)| value.as_str())
1030}
1031
1032fn fallback_body_digest(body: &[u8]) -> String {
1033 use sha2::Digest;
1034
1035 let digest = sha2::Sha256::digest(body);
1036 let mut encoded = String::with_capacity(digest.len() * 2);
1037 for byte in digest {
1038 encoded.push_str(&format!("{byte:02x}"));
1039 }
1040 format!("sha256:{encoded}")
1041}
1042
1043fn graphql_meta(headers: &reqwest::header::HeaderMap, query: &str) -> JsonValue {
1044 let estimated = estimate_query_complexity(query);
1045 let observed = header_i64(headers, "x-complexity");
1046 let complexity = observed.or(estimated);
1047 let complexity_warning = complexity.is_some_and(|value| value >= COMPLEXITY_WARNING_THRESHOLD);
1048 json!({
1049 "complexity_estimate": estimated,
1050 "observed_complexity": observed,
1051 "complexity_warning": complexity_warning,
1052 "rate_limit": {
1053 "requests_limit": header_i64(headers, "x-ratelimit-requests-limit"),
1054 "requests_remaining": header_i64(headers, "x-ratelimit-requests-remaining"),
1055 "requests_reset": header_i64(headers, "x-ratelimit-requests-reset"),
1056 "complexity_limit": header_i64(headers, "x-ratelimit-complexity-limit"),
1057 "complexity_remaining": header_i64(headers, "x-ratelimit-complexity-remaining"),
1058 "complexity_reset": header_i64(headers, "x-ratelimit-complexity-reset"),
1059 },
1060 })
1061}
1062
1063fn header_i64(headers: &reqwest::header::HeaderMap, name: &str) -> Option<i64> {
1064 headers
1065 .get(name)
1066 .and_then(|value| value.to_str().ok())
1067 .and_then(|value| value.parse::<i64>().ok())
1068}
1069
1070fn estimate_query_complexity(query: &str) -> Option<i64> {
1071 let normalized = query.trim();
1072 if normalized.is_empty() {
1073 return None;
1074 }
1075 let mut score = 1i64;
1076 let field_count =
1077 normalized.matches('{').count() as i64 + normalized.matches('}').count() as i64;
1078 score += field_count.max(1);
1079 for window in ["first:", "last:"] {
1080 let mut rest = normalized;
1081 while let Some(index) = rest.find(window) {
1082 let after = &rest[index + window.len()..];
1083 let digits: String = after
1084 .chars()
1085 .skip_while(|ch| ch.is_whitespace())
1086 .take_while(|ch| ch.is_ascii_digit())
1087 .collect();
1088 if let Ok(value) = digits.parse::<i64>() {
1089 score += value;
1090 } else {
1091 score += 50;
1092 }
1093 rest = after;
1094 }
1095 }
1096 score += (normalized.matches("nodes").count() as i64) * 50;
1097 Some(score)
1098}
1099
1100fn extract_graphql_field(mut envelope: JsonValue, field: &str) -> Result<JsonValue, ClientError> {
1101 let meta = envelope.get("meta").cloned().unwrap_or(JsonValue::Null);
1102 let Some(data) = envelope.get_mut("data").and_then(JsonValue::as_object_mut) else {
1103 return Err(ClientError::Transport(
1104 "linear connector response missing GraphQL data".to_string(),
1105 ));
1106 };
1107 let mut extracted = data.remove(field).ok_or_else(|| {
1108 ClientError::Transport(format!(
1109 "linear connector response missing `{field}` GraphQL field"
1110 ))
1111 })?;
1112 if let Some(object) = extracted.as_object_mut() {
1113 object.insert("meta".to_string(), meta);
1114 }
1115 Ok(extracted)
1116}
1117
1118fn is_rate_limited(status: StatusCode, errors: Option<&JsonValue>) -> bool {
1119 if status == StatusCode::TOO_MANY_REQUESTS {
1120 return true;
1121 }
1122 if status == StatusCode::BAD_REQUEST {
1123 return graphql_error_codes(errors)
1124 .into_iter()
1125 .any(|code| code.eq_ignore_ascii_case("RATELIMITED"));
1126 }
1127 false
1128}
1129
1130fn graphql_error_codes(errors: Option<&JsonValue>) -> Vec<String> {
1131 errors
1132 .and_then(JsonValue::as_array)
1133 .map(|errors| {
1134 errors
1135 .iter()
1136 .filter_map(|error| {
1137 error
1138 .get("extensions")
1139 .and_then(JsonValue::as_object)
1140 .and_then(|extensions| extensions.get("code"))
1141 .and_then(JsonValue::as_str)
1142 .map(ToString::to_string)
1143 })
1144 .collect()
1145 })
1146 .unwrap_or_default()
1147}
1148
1149fn graphql_error_message(status: StatusCode, errors: Option<&JsonValue>) -> String {
1150 let messages = errors
1151 .and_then(JsonValue::as_array)
1152 .map(|errors| {
1153 errors
1154 .iter()
1155 .filter_map(|error| {
1156 error
1157 .get("message")
1158 .and_then(JsonValue::as_str)
1159 .map(ToString::to_string)
1160 })
1161 .collect::<Vec<_>>()
1162 })
1163 .unwrap_or_default();
1164 if messages.is_empty() {
1165 format!("linear GraphQL request failed with status {status}")
1166 } else {
1167 format!(
1168 "linear GraphQL request failed with status {status}: {}",
1169 messages.join("; ")
1170 )
1171 }
1172}