canic_core/access/
auth.rs1use crate::{
15 access::AccessError,
16 cdk::{
17 api::{canister_self, is_controller as caller_is_controller, msg_arg_data},
18 candid::de::IDLDeserialize,
19 types::Principal,
20 },
21 config::Config,
22 dto::{auth::DelegatedToken, rpc::AuthenticatedRequest},
23 ids::CanisterRole,
24 ops::{
25 auth::{DelegatedTokenOps, VerifiedDelegatedToken},
26 ic::IcOps,
27 runtime::env::EnvOps,
28 storage::{children::CanisterChildrenOps, registry::subnet::SubnetRegistryOps},
29 },
30};
31
32const MAX_INGRESS_BYTES: usize = 64 * 1024; pub type Role = CanisterRole;
35
36pub async fn authenticated(_caller: Principal) -> Result<(), AccessError> {
44 let _ = delegated_token_verified(_caller).await?;
45 Ok(())
46}
47
48pub(crate) async fn delegated_token_verified(
49 _caller: Principal,
50) -> Result<VerifiedDelegatedToken, AccessError> {
51 if should_skip_delegated_auth() {
52 return Ok(VerifiedDelegatedToken::dev_bypass());
54 }
55
56 let token = delegated_token_from_args()?;
57
58 let authority_pid =
59 EnvOps::root_pid().map_err(|_| dependency_unavailable("root pid unavailable"))?;
60
61 let now_secs = IcOps::now_secs();
62
63 verify_token(token, authority_pid, now_secs).await
64}
65
66#[allow(clippy::unused_async)]
68async fn verify_token(
69 token: DelegatedToken,
70 authority_pid: Principal,
71 now_secs: u64,
72) -> Result<VerifiedDelegatedToken, AccessError> {
73 let verified = DelegatedTokenOps::verify_token(&token, authority_pid, now_secs)
74 .map_err(|err| AccessError::Denied(err.to_string()))?;
75
76 Ok(verified)
77}
78
79#[allow(clippy::unused_async)]
86pub async fn is_controller(caller: Principal) -> Result<(), AccessError> {
87 if caller_is_controller(&caller) {
88 Ok(())
89 } else {
90 Err(AccessError::Denied(format!(
91 "caller '{caller}' is not a controller of this canister"
92 )))
93 }
94}
95
96#[allow(clippy::unused_async)]
99pub async fn is_whitelisted(caller: Principal) -> Result<(), AccessError> {
100 let cfg = Config::try_get().ok_or_else(|| dependency_unavailable("config not initialized"))?;
101
102 if !cfg.is_whitelisted(&caller) {
103 return Err(AccessError::Denied(format!(
104 "caller '{caller}' is not on the whitelist"
105 )));
106 }
107
108 Ok(())
109}
110
111#[allow(clippy::unused_async)]
113pub async fn is_child(caller: Principal) -> Result<(), AccessError> {
114 if CanisterChildrenOps::contains_pid(&caller) {
115 Ok(())
116 } else {
117 Err(AccessError::Denied(format!(
118 "caller '{caller}' is not a child of this canister"
119 )))
120 }
121}
122
123#[allow(clippy::unused_async)]
125pub async fn is_parent(caller: Principal) -> Result<(), AccessError> {
126 let snapshot = EnvOps::snapshot();
127 let parent_pid = snapshot
128 .parent_pid
129 .ok_or_else(|| dependency_unavailable("parent pid unavailable"))?;
130
131 if parent_pid == caller {
132 Ok(())
133 } else {
134 Err(AccessError::Denied(format!(
135 "caller '{caller}' is not the parent of this canister"
136 )))
137 }
138}
139
140#[allow(clippy::unused_async)]
142pub async fn is_root(caller: Principal) -> Result<(), AccessError> {
143 let root_pid =
144 EnvOps::root_pid().map_err(|_| dependency_unavailable("root pid unavailable"))?;
145
146 if caller == root_pid {
147 Ok(())
148 } else {
149 Err(AccessError::Denied(format!(
150 "caller '{caller}' is not root"
151 )))
152 }
153}
154
155#[allow(clippy::unused_async)]
157pub async fn is_same_canister(caller: Principal) -> Result<(), AccessError> {
158 if caller == canister_self() {
159 Ok(())
160 } else {
161 Err(AccessError::Denied(format!(
162 "caller '{caller}' is not the current canister"
163 )))
164 }
165}
166
167#[allow(clippy::unused_async)]
173pub async fn has_role(caller: Principal, role: Role) -> Result<(), AccessError> {
174 let record = SubnetRegistryOps::get(caller).ok_or_else(|| {
175 AccessError::Denied(format!(
176 "caller '{caller}' is not registered on the subnet registry"
177 ))
178 })?;
179
180 if record.role == role {
181 Ok(())
182 } else {
183 Err(AccessError::Denied(format!(
184 "caller '{caller}' does not have role '{role}'"
185 )))
186 }
187}
188
189#[allow(clippy::unused_async)]
192pub async fn is_registered_to_subnet(caller: Principal) -> Result<(), AccessError> {
193 if SubnetRegistryOps::is_registered(caller) {
194 Ok(())
195 } else {
196 Err(AccessError::Denied(format!(
197 "caller '{caller}' is not registered on the subnet registry"
198 )))
199 }
200}
201
202fn delegated_token_from_args() -> Result<DelegatedToken, AccessError> {
203 let bytes = msg_arg_data();
204
205 if bytes.len() > MAX_INGRESS_BYTES {
206 return Err(AccessError::Denied(
207 "delegated token payload exceeds size limit".to_string(),
208 ));
209 }
210
211 let mut token_decoder = IDLDeserialize::new(&bytes)
212 .map_err(|err| AccessError::Denied(format!("failed to decode ingress arguments: {err}")))?;
213
214 if let Ok(token) = token_decoder.get_value::<DelegatedToken>() {
216 return Ok(token);
217 }
218
219 let mut envelope_decoder = IDLDeserialize::new(&bytes)
220 .map_err(|err| AccessError::Denied(format!("failed to decode ingress arguments: {err}")))?;
221
222 let envelope = envelope_decoder
223 .get_value::<AuthenticatedRequest>()
224 .map_err(|err| {
225 AccessError::Denied(format!(
226 "failed to decode delegated token as first argument: {err}"
227 ))
228 })?;
229
230 Ok(envelope.delegated_token)
231}
232
233fn dependency_unavailable(detail: &str) -> AccessError {
234 AccessError::Denied(format!("access dependency unavailable: {detail}"))
235}
236
237fn should_skip_delegated_auth() -> bool {
238 should_skip_delegated_auth_for(option_env!("DFX_NETWORK"))
239}
240
241fn should_skip_delegated_auth_for(network: Option<&str>) -> bool {
242 matches!(network, None | Some("local"))
243}
244
245#[cfg(test)]
250mod tests {
251 use super::should_skip_delegated_auth_for;
252
253 #[test]
254 fn delegated_auth_bypass_matrix() {
255 assert!(should_skip_delegated_auth_for(Some("local")));
256 assert!(!should_skip_delegated_auth_for(Some("ic")));
257 assert!(should_skip_delegated_auth_for(None));
258 assert!(!should_skip_delegated_auth_for(Some("nope")));
259 }
260}