use std::borrow::Cow;
pub use content_security_policy::InlineCheckType;
pub use content_security_policy::Violation;
use content_security_policy::{
CheckResult, CspList, Destination, Element as CspElement, Initiator, NavigationCheckType,
Origin, ParserMetadata, PolicyDisposition, PolicySource, Request, Response as CspResponse,
ViolationResource,
};
use http::header::{HeaderMap, HeaderValue, ValueIter};
use hyper_serde::Serde;
use js::rust::describe_scripted_caller;
use log::warn;
use servo_constellation_traits::{LoadData, LoadOrigin};
use url::Url;
use super::csppolicyviolationreport::CSPViolationReportBuilder;
use crate::dom::bindings::codegen::Bindings::WindowBinding::WindowMethods;
use crate::dom::bindings::codegen::UnionTypes::TrustedScriptOrString;
use crate::dom::bindings::inheritance::Castable;
use crate::dom::bindings::refcounted::Trusted;
use crate::dom::bindings::root::DomRoot;
use crate::dom::element::Element;
use crate::dom::globalscope::GlobalScope;
use crate::dom::node::{Node, NodeTraits};
use crate::dom::reporting::reportingobserver::ReportingObserver;
use crate::dom::security::cspviolationreporttask::CSPViolationReportTask;
use crate::dom::trustedtypes::trustedscript::TrustedScript;
use crate::dom::window::Window;
pub(crate) trait CspReporting {
fn is_js_evaluation_allowed(&self, global: &GlobalScope, source: &str) -> bool;
fn is_wasm_evaluation_allowed(&self, global: &GlobalScope) -> bool;
fn should_navigation_request_be_blocked(
&self,
cx: &mut js::context::JSContext,
global: &GlobalScope,
load_data: &mut LoadData,
element: Option<&Element>,
) -> bool;
fn should_navigation_response_to_navigation_request_be_blocked(
&self,
window: &Window,
url: Url,
self_origin: &url::Origin,
) -> bool;
fn should_elements_inline_type_behavior_be_blocked(
&self,
global: &GlobalScope,
el: &Element,
type_: InlineCheckType,
source: &str,
current_line: u32,
) -> bool;
fn is_trusted_type_policy_creation_allowed(
&self,
global: &GlobalScope,
policy_name: &str,
created_policy_names: &[&str],
) -> bool;
fn does_sink_type_require_trusted_types(
&self,
sink_group: &str,
include_report_only_policies: bool,
) -> bool;
fn should_sink_type_mismatch_violation_be_blocked_by_csp(
&self,
global: &GlobalScope,
sink: &str,
sink_group: &str,
source: &str,
) -> bool;
fn is_base_allowed_for_document(
&self,
global: &GlobalScope,
base: &url::Url,
self_origin: &url::Origin,
) -> bool;
fn concatenate(self, new_csp_list: Option<CspList>) -> Option<CspList>;
}
impl CspReporting for Option<CspList> {
fn is_js_evaluation_allowed(&self, global: &GlobalScope, source: &str) -> bool {
let Some(csp_list) = self else {
return true;
};
let (is_js_evaluation_allowed, violations) = csp_list.is_js_evaluation_allowed(source);
global.report_csp_violations(violations, None, None);
is_js_evaluation_allowed == CheckResult::Allowed
}
fn is_wasm_evaluation_allowed(&self, global: &GlobalScope) -> bool {
let Some(csp_list) = self else {
return true;
};
let (is_wasm_evaluation_allowed, violations) = csp_list.is_wasm_evaluation_allowed();
global.report_csp_violations(violations, None, None);
is_wasm_evaluation_allowed == CheckResult::Allowed
}
fn should_navigation_request_be_blocked(
&self,
cx: &mut js::context::JSContext,
global: &GlobalScope,
load_data: &mut LoadData,
element: Option<&Element>,
) -> bool {
let Some(csp_list) = self else {
return false;
};
let mut request = Request {
url: load_data.url.clone().into_url(),
current_url: load_data.url.clone().into_url(),
origin: match &load_data.load_origin {
LoadOrigin::Script(origin) => origin.immutable().clone().into_url_origin(),
_ => Origin::new_opaque(),
},
redirect_count: 0,
destination: Destination::None,
initiator: Initiator::None,
nonce: "".to_owned(),
integrity_metadata: "".to_owned(),
parser_metadata: ParserMetadata::None,
};
let (result, violations) = csp_list.should_navigation_request_be_blocked(
&mut request,
NavigationCheckType::Other,
|script_source| {
TrustedScript::get_trusted_type_compliant_string(
cx,
global,
TrustedScriptOrString::String(script_source.into()),
"Location href",
)
.ok()
.map(|s| s.into())
},
);
load_data.url = request.url.into();
global.report_csp_violations(violations, element, None);
result == CheckResult::Blocked
}
fn should_navigation_response_to_navigation_request_be_blocked(
&self,
window: &Window,
url: Url,
self_origin: &url::Origin,
) -> bool {
let Some(csp_list) = self else {
return false;
};
let mut window_proxy = window.window_proxy();
let mut parent_navigable_origins = vec![];
loop {
if let Some(container_element) = window_proxy.frame_element() {
let container_document = container_element.owner_document();
let parent_origin = Url::parse(
&container_document
.origin()
.immutable()
.ascii_serialization(),
)
.expect("Must always be able to parse document origin");
parent_navigable_origins.push(parent_origin);
window_proxy = container_document.window().window_proxy();
continue;
}
if let Some(parent_proxy) = window_proxy.parent() {
let Some(parent_origin) = parent_proxy.document_origin() else {
break;
};
let parent_origin = Url::parse(&parent_origin)
.expect("Must always be able to parse document origin");
parent_navigable_origins.push(parent_origin);
window_proxy = DomRoot::from_ref(parent_proxy);
continue;
}
break;
}
let (is_navigation_response_blocked, violations) = csp_list
.should_navigation_response_to_navigation_request_be_blocked(
&CspResponse {
url,
redirect_count: 0,
},
self_origin,
&parent_navigable_origins,
);
window
.as_global_scope()
.report_csp_violations(violations, None, None);
is_navigation_response_blocked == CheckResult::Blocked
}
fn should_elements_inline_type_behavior_be_blocked(
&self,
global: &GlobalScope,
el: &Element,
type_: InlineCheckType,
source: &str,
current_line: u32,
) -> bool {
let Some(csp_list) = self else {
return false;
};
let element = CspElement {
nonce: if el.is_nonceable() {
Some(Cow::Owned(el.nonce_value().trim().to_owned()))
} else {
None
},
};
let (result, violations) =
csp_list.should_elements_inline_type_behavior_be_blocked(&element, type_, source);
let source_position = el.compute_source_position(current_line.saturating_sub(2).max(1));
global.report_csp_violations(violations, Some(el), Some(source_position));
result == CheckResult::Blocked
}
fn is_trusted_type_policy_creation_allowed(
&self,
global: &GlobalScope,
policy_name: &str,
created_policy_names: &[&str],
) -> bool {
let Some(csp_list) = self else {
return true;
};
let (allowed_by_csp, violations) =
csp_list.is_trusted_type_policy_creation_allowed(policy_name, created_policy_names);
global.report_csp_violations(violations, None, None);
allowed_by_csp == CheckResult::Allowed
}
fn does_sink_type_require_trusted_types(
&self,
sink_group: &str,
include_report_only_policies: bool,
) -> bool {
let Some(csp_list) = self else {
return false;
};
csp_list.does_sink_type_require_trusted_types(sink_group, include_report_only_policies)
}
fn should_sink_type_mismatch_violation_be_blocked_by_csp(
&self,
global: &GlobalScope,
sink: &str,
sink_group: &str,
source: &str,
) -> bool {
let Some(csp_list) = self else {
return false;
};
let (allowed_by_csp, violations) = csp_list
.should_sink_type_mismatch_violation_be_blocked_by_csp(sink, sink_group, source);
global.report_csp_violations(violations, None, None);
allowed_by_csp == CheckResult::Blocked
}
fn is_base_allowed_for_document(
&self,
global: &GlobalScope,
base: &url::Url,
self_origin: &url::Origin,
) -> bool {
let Some(csp_list) = self else {
return true;
};
let (is_base_allowed, violations) =
csp_list.is_base_allowed_for_document(base, self_origin);
global.report_csp_violations(violations, None, None);
is_base_allowed == CheckResult::Allowed
}
fn concatenate(self, new_csp_list: Option<CspList>) -> Option<CspList> {
let Some(new_csp_list) = new_csp_list else {
return self;
};
match self {
None => Some(new_csp_list),
Some(mut old_csp_list) => {
old_csp_list.append(new_csp_list);
Some(old_csp_list)
},
}
}
}
pub(crate) struct SourcePosition {
pub(crate) source_file: String,
pub(crate) line_number: u32,
pub(crate) column_number: u32,
}
pub(crate) trait GlobalCspReporting {
fn report_csp_violations(
&self,
violations: Vec<Violation>,
element: Option<&Element>,
source_position: Option<SourcePosition>,
);
}
#[expect(unsafe_code)]
fn compute_scripted_caller_source_position() -> SourcePosition {
match unsafe { describe_scripted_caller(*GlobalScope::get_cx()) } {
Ok(scripted_caller) => SourcePosition {
source_file: scripted_caller.filename,
line_number: scripted_caller.line,
column_number: scripted_caller.col + 1,
},
Err(()) => SourcePosition {
source_file: String::new(),
line_number: 0,
column_number: 0,
},
}
}
fn obtain_blocked_uri_for_violation_resource_with_sample(
resource: ViolationResource,
) -> (Option<String>, String) {
match resource {
ViolationResource::Inline { sample } => (sample, "inline".to_owned()),
ViolationResource::Url(url) => (
Some(String::new()),
ReportingObserver::strip_url_for_reports(url.into()),
),
ViolationResource::TrustedTypePolicy { sample } => {
(Some(sample), "trusted-types-policy".to_owned())
},
ViolationResource::TrustedTypeSink { sample } => {
(Some(sample), "trusted-types-sink".to_owned())
},
ViolationResource::Eval { sample } => (sample, "eval".to_owned()),
ViolationResource::WasmEval => (None, "wasm-eval".to_owned()),
}
}
impl GlobalCspReporting for GlobalScope {
fn report_csp_violations(
&self,
violations: Vec<Violation>,
element: Option<&Element>,
source_position: Option<SourcePosition>,
) {
if violations.is_empty() {
return;
}
warn!("Reporting CSP violations: {:?}", violations);
let source_position =
source_position.unwrap_or_else(compute_scripted_caller_source_position);
for violation in violations {
let (sample, resource) =
obtain_blocked_uri_for_violation_resource_with_sample(violation.resource);
let report = CSPViolationReportBuilder::default()
.resource(resource)
.sample(sample)
.effective_directive(violation.directive.name)
.original_policy(violation.policy.to_string())
.report_only(violation.policy.disposition == PolicyDisposition::Report)
.source_file(source_position.source_file.clone())
.line_number(source_position.line_number)
.column_number(source_position.column_number)
.build(self);
let target = element.and_then(|event_target| {
if let Some(window) = self.downcast::<Window>() {
if event_target.upcast::<Node>().owner_document() != window.Document() {
return None;
}
}
Some(event_target)
});
let target = match target {
None => {
if let Some(window) = self.downcast::<Window>() {
Trusted::new(window.Document().upcast())
} else {
Trusted::new(self.upcast())
}
},
Some(event_target) => Trusted::new(event_target.upcast()),
};
let task =
CSPViolationReportTask::new(Trusted::new(self), target, report, violation.policy);
self.task_manager()
.dom_manipulation_task_source()
.queue(task);
}
}
}
fn parse_and_potentially_append_to_csp_list(
old_csp_list: Option<CspList>,
csp_header_iter: ValueIter<HeaderValue>,
disposition: PolicyDisposition,
) -> Option<CspList> {
let mut csp_list = old_csp_list;
for header in csp_header_iter {
let new_csp_list = header
.to_str()
.ok()
.map(|value| CspList::parse(value, PolicySource::Header, disposition));
csp_list = csp_list.concatenate(new_csp_list);
}
csp_list
}
pub(crate) fn parse_csp_list_from_metadata(headers: &Option<Serde<HeaderMap>>) -> Option<CspList> {
let headers = headers.as_ref()?;
let csp_enforce_list = parse_and_potentially_append_to_csp_list(
None,
headers.get_all("content-security-policy").iter(),
PolicyDisposition::Enforce,
);
parse_and_potentially_append_to_csp_list(
csp_enforce_list,
headers
.get_all("content-security-policy-report-only")
.iter(),
PolicyDisposition::Report,
)
}