1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
//! GPG related functions and types.
use crate::repository::PyRepository;
use crate::RevisionId;
use pyo3::import_exception;
use pyo3::prelude::*;
#[derive(Debug)]
/// Errors that can occur when working with GPG.
pub enum Error {
/// GPG is not installed on the system.
GPGNotInstalled,
}
#[derive(Debug)]
/// GPG signing modes.
pub enum Mode {
/// Normal signing mode.
Normal,
/// Detached signature mode.
Detach,
/// Clear signature mode.
Clear,
}
#[derive(Debug)]
/// Status of a GPG signature verification.
pub enum Status {
/// Signature is valid.
Valid,
/// Signature key is missing from the keyring.
KeyMissing(String),
/// Signature with the specified key is not valid.
NotValid(String),
/// Content is not signed.
NotSigned,
/// Signature key has expired.
Expired(String),
}
import_exception!(breezy.gpg, GPGNotInstalled);
impl From<PyErr> for Error {
fn from(e: PyErr) -> Self {
Python::attach(|py| {
if e.is_instance_of::<GPGNotInstalled>(py) {
Error::GPGNotInstalled
} else {
panic!("unexpected exception: {:?}", e)
}
})
}
}
/// Strategy for handling GPG signatures.
pub struct GPGStrategy(Py<PyAny>);
impl GPGStrategy {
fn to_object(&self) -> &Py<PyAny> {
&self.0
}
/// Create a new GPG strategy with the given branch configuration.
pub fn new(branch_config: &crate::config::BranchConfig) -> Self {
Python::attach(|py| {
let gpg = PyModule::import(py, "breezy.gpg").unwrap();
let gpg_strategy = gpg.getattr("GPGStrategy").unwrap();
let branch_config = branch_config.clone().into_pyobject(py).unwrap().unbind();
let strategy = gpg_strategy.call1((branch_config,)).unwrap();
GPGStrategy(strategy.unbind())
})
}
/// Set the GPG keys that are acceptable for validating signatures.
pub fn set_acceptable_keys(&self, keys: &[String]) {
Python::attach(|py| {
self.0
.call_method1(py, "set_acceptable_keys", (keys.join(","),))
.unwrap();
})
}
}
impl<'py> IntoPyObject<'py> for GPGStrategy {
type Target = PyAny;
type Output = Bound<'py, Self::Target>;
type Error = std::convert::Infallible;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
Ok(self.0.into_bound(py))
}
}
impl<'a, 'py> FromPyObject<'a, 'py> for GPGStrategy {
type Error = PyErr;
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
Ok(GPGStrategy(ob.to_owned().unbind()))
}
}
#[derive(Debug)]
/// Result of verifying a GPG signature.
pub enum VerificationResult {
/// Signature is valid with the specified key.
Valid(String),
/// Signature uses a key that is missing from the keyring.
KeyMissing(String),
/// Signature with the given key is not valid.
NotValid(String),
/// Content is not signed with a GPG signature.
NotSigned,
/// Signature is from an expired key.
Expired(String),
}
impl VerificationResult {
/// Returns the key string for the signature if available.
pub fn key(&self) -> Option<&str> {
match self {
VerificationResult::Valid(key) => Some(key),
VerificationResult::KeyMissing(key) => Some(key),
VerificationResult::NotValid(key) => Some(key),
VerificationResult::Expired(key) => Some(key),
_ => None,
}
}
/// Check if the verification result indicates a valid signature.
pub fn is_valid(&self) -> bool {
matches!(self, VerificationResult::Valid(_))
}
/// Check if the verification result indicates a missing key.
pub fn is_key_missing(&self) -> bool {
matches!(self, VerificationResult::KeyMissing(_))
}
/// Check if the verification result indicates an invalid signature.
pub fn is_not_valid(&self) -> bool {
matches!(self, VerificationResult::NotValid(_))
}
/// Check if the verification result indicates the content is not signed.
pub fn is_not_signed(&self) -> bool {
matches!(self, VerificationResult::NotSigned)
}
/// Check if the verification result indicates an expired key.
pub fn is_expired(&self) -> bool {
matches!(self, VerificationResult::Expired(_))
}
}
/// Bulk verify GPG signatures for a set of revisions.
///
/// # Arguments
///
/// * `repository` - The repository containing the revisions
/// * `revids` - List of revision IDs to verify signatures for
/// * `strategy` - GPG strategy to use for verification
///
/// # Returns
///
/// A vector of tuples containing revision IDs and their verification results
pub fn bulk_verify_signatures<R: PyRepository>(
repository: &R,
revids: &[&RevisionId],
strategy: &GPGStrategy,
) -> Result<Vec<(RevisionId, VerificationResult)>, Error> {
Python::attach(|py| {
let gpg = PyModule::import(py, "breezy.gpg").unwrap();
let bulk_verify_signatures = gpg.getattr("bulk_verify_signatures").unwrap();
let r = bulk_verify_signatures
.call1((
repository.to_object(py),
revids
.iter()
.map(|r| (*r).clone().into_pyobject(py).unwrap())
.collect::<Vec<_>>(),
strategy.to_object(),
))
.map_err(|e| -> Error { e.into() })
.unwrap();
let (_count, result, _all_verifiable) = r
.extract::<(Py<PyAny>, Vec<(RevisionId, isize, String)>, bool)>()
.unwrap();
let result: Vec<(RevisionId, VerificationResult)> = result
.into_iter()
.map(|(revid, status, key)| {
let status = match status {
0 => VerificationResult::Valid(key),
1 => VerificationResult::KeyMissing(key),
2 => VerificationResult::NotValid(key),
3 => VerificationResult::NotSigned,
4 => VerificationResult::Expired(key),
_ => panic!("unexpected status: {}", status),
};
(revid, status)
})
.collect::<Vec<_>>();
Ok(result)
})
}
/// Context for interacting with GPG.
pub struct GPGContext(Py<PyAny>);
/// Represents a GPG key.
pub struct GPGKey {
/// Fingerprint of the GPG key.
pub fpr: String,
}
impl<'a, 'py> FromPyObject<'a, 'py> for GPGKey {
type Error = PyErr;
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
Ok(GPGKey {
fpr: ob.getattr("fpr").unwrap().extract().unwrap(),
})
}
}
impl GPGContext {
/// Create a new GPG context.
pub fn new() -> Self {
Python::attach(|py| {
let gpg = PyModule::import(py, "gpg").unwrap();
let gpg_context = gpg.getattr("Context").unwrap();
let context = gpg_context.call0().unwrap();
GPGContext(context.unbind())
})
}
/// List GPG keys.
///
/// # Arguments
///
/// * `secret` - If true, list only secret keys. Otherwise, list all keys.
///
/// # Returns
///
/// A vector of GPG keys.
pub fn keylist(&self, secret: bool) -> Vec<GPGKey> {
Python::attach(|py| {
self.0
.call_method1(py, "keylist", (secret,))
.unwrap()
.extract::<Vec<GPGKey>>(py)
.unwrap()
})
}
/// Export the minimal form of a GPG key.
///
/// # Arguments
///
/// * `key` - The key ID or fingerprint to export
///
/// # Returns
///
/// The exported key data as a byte vector.
pub fn key_export_minimal(&self, key: &str) -> Vec<u8> {
Python::attach(|py| {
self.0
.call_method1(py, "key_export_minimal", (key,))
.unwrap()
.extract::<Vec<u8>>(py)
.unwrap()
})
}
}